Exportieren von Langfuse-Ablaufverfolgungen in Azure Databricks MLflow

Konfigurieren Sie Langfuse, um OpenTelemetry-basierte Trace-Spans an den Azure Databricks MLflow OTLP-Endpunkt zu senden, damit die Spuren in Unity-Katalogtabellen zusammen mit Ihren anderen MLflow-Traces gespeichert werden.

Das Konsolidieren von Ablaufverfolgungen in Azure Databricks bietet die folgenden Vorteile:

  • Abfragen und Vergleichen von Langfuse-instrumentierten Aufrufen zusammen mit Ablaufverfolgungen aus anderen Frameworks an einer zentralen Stelle.
  • Verwenden Sie Azure Databricks SQL, um Ablaufverfolgungsdaten im großen Maßstab zu analysieren.
  • Wenden Sie die Governance des Unity-Katalogs, wie z.B. Zugriffssteuerungen und Herkunft, auf alle Ihre Spuren an.

Anforderungen

Schritt 1: Installieren von Paketen

Installieren Sie die erforderlichen Pakete in Ihrem Azure Databricks-Notizbuch:

%pip install "langfuse>=3.14.5" "mlflow[databricks]>=3.10.0" opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http
%restart_python

Schritt 2: Deaktivieren der Langfuse-Tracesammlung

Langfuse erfordert die Umgebungsvariablen LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY und LANGFUSE_SECRET_KEY, um das SDK zu initialisieren. Setzen Sie diese Variablen auf Dummywerte, damit Traces nicht an einen Langfuse-Server gesendet werden. Nur der OTEL-Exporter, der in Add the OTLP exporter konfiguriert ist, erhält die spans.

import os

os.environ["LANGFUSE_HOST"] = "localhost"
os.environ["LANGFUSE_PUBLIC_KEY"] = ""
os.environ["LANGFUSE_SECRET_KEY"] = ""

Hinweis

Dies ist der einfachste Ansatz, um zu verhindern, dass Langfuse Traces auf seinen eigenen Servern sammelt. Alternativ können Sie LANGFUSE_TRACING_ENABLED=False so einstellen, dass die eingebaute Tracing-Funktion von Langfuse deaktiviert wird.

Schritt 3: Konfigurieren der Azure Databricks-Verbindung

Rufen Sie die Arbeitsbereichs-Host-URL und ein API-Token ab. In einem Azure Databricks-Notizbuch können Sie diese aus dem Notizbuchkontext abrufen:

# If running outside a Databricks notebook, set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables manually.
context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
DATABRICKS_HOST = context.apiUrl().get().rstrip("/")
DATABRICKS_TOKEN = context.apiToken().get()

Definieren Sie Ihren Unity-Katalogkatalog und das Schema, und verknüpfen Sie sie dann mit einem MLflow-Experiment.set_experiment_trace_location Dadurch wird Azure Databricks mitgeteilt, wo die eingehenden Traces gespeichert werden sollen.

import mlflow
from mlflow.entities import UCSchemaLocation
from mlflow.tracing.enablement import set_experiment_trace_location

CATALOG_NAME = "<UC_CATALOG_NAME>"
SCHEMA_NAME = "<UC_SCHEMA_NAME>"
EXPERIMENT_ID = "<EXPERIMENT_ID>"

trace_location = set_experiment_trace_location(
    location=UCSchemaLocation(catalog_name=CATALOG_NAME, schema_name=SCHEMA_NAME),
    experiment_id=EXPERIMENT_ID,
)

Schritt 5: Hinzufügen des OTLP-Exporters von Azure Databricks

Rufen Sie das TracerProvider ab, das Langfuse verwendet, und fügen Sie dann ein BatchSpanProcessor mit einem OTLPSpanExporter hinzu, das auf den OTLP-Endpunkt von Azure Databricks verweist.

from langfuse import get_client
from opentelemetry import trace as otel_trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Initialize the Langfuse client. Langfuse registers its own TracerProvider as the global OpenTelemetry TracerProvider.
langfuse = get_client()

# Retrieve the global TracerProvider.
# Use this to attach an additional span processor that forwards Langfuse spans to Databricks.
provider = otel_trace.get_tracer_provider()

# Configure the OTLP exporter to send spans to the Databricks MLflow endpoint.
# The X-Databricks-UC-Table-Name header routes incoming spans to the Unity Catalog table defined by the trace location.
databricks_exporter = OTLPSpanExporter(
    endpoint=f"{DATABRICKS_HOST}/api/2.0/otel/v1/traces",
    headers={
        "content-type": "application/x-protobuf",
        "Authorization": f"Bearer {DATABRICKS_TOKEN}",
        "X-Databricks-UC-Table-Name": trace_location.full_otel_spans_table_name,
    },
)

# Attach the Databricks exporter to Langfuse's provider. Because the Langfuse
# env vars are set to dummy values, this processor is the only active exporter,
# so all spans are sent exclusively to Databricks.
provider.add_span_processor(BatchSpanProcessor(databricks_exporter))

Schritt 6: Eine protokollierte Funktion ausführen

Verwenden Sie Langfuse's @observe() Decorator, um Ihre Funktionen zu instrumentieren. Der Dekorateur erstellt OTEL-Spans, die vom Exporter an Azure Databricks gesendet werden.

from langfuse import observe

@observe()
def my_llm_call(prompt: str) -> str:
    # Replace with your LLM logic (OpenAI, Anthropic, etc.)
    return f"Response to: {prompt}"

@observe()
def my_pipeline(user_input: str) -> str:
    result = my_llm_call(user_input)
    return result

my_pipeline("Hello, world!")

Schritt 7: Ablaufverfolgungen anzeigen

Öffnen Sie das MLflow-Experiment in Ihrem Azure Databricks-Arbeitsbereich, und klicken Sie auf die Registerkarte " Ablaufverfolgungen ". Die Ablaufverfolgung aus dem my_pipeline Anruf sollte angezeigt werden.

Nächste Schritte