Condividi tramite


Connettersi ad Apache Kafka

Questo articolo descrive come usare Apache Kafka come origine o sink durante l'esecuzione di carichi di lavoro Structured Streaming in Azure Databricks.

Per altre informazioni su Kafka, vedere la documentazione di Apache Kafka.

Leggere dati da Kafka

Azure Databricks fornisce la kafka parola chiave come formato dati per configurare le connessioni a Kafka. Di seguito è riportato un esempio per una lettura in streaming:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks supporta anche la semantica di lettura batch, come illustrato nell'esempio seguente:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

Per il caricamento batch incrementale, Databricks consiglia di usare Kafka con Trigger.AvailableNow. Vedere AvailableNow: Elaborazione batch incrementale.

In Databricks Runtime 13.3 LTS e versioni successive, Azure Databricks fornisce anche una funzione SQL per la lettura dei dati Kafka. Lo streaming con SQL è supportato solo nelle pipeline dichiarative di Lakeflow Spark o con tabelle di streaming in Databricks SQL. Vedere read_kafka funzione con valori di tabella.

Configurare il lettore di streaming strutturato Kafka

L'opzione seguente deve essere impostata per l'origine Kafka sia per le query batch che per le query di streaming:

Opzione Valore Descrizione
kafka.bootstrap.servers Elenco delimitato da virgole di host:port Server bootstrap del cluster Kafka

Inoltre, è necessaria una delle opzioni seguenti per specificare gli argomenti da sottoscrivere:

Opzione Valore Descrizione
subscribe Elenco di argomenti delimitato da virgole. Elenco di argomenti a cui sottoscrivere.
subscribePattern Stringa di espressione regolare di Java. Modello utilizzato per iscriversi agli argomento/i.
assign Stringa JSON {"topicA":[0,1],"topic":[2,4]}. Partizioni di argomenti specifiche da utilizzare.

Vedere la pagina Opzioni per l'elenco completo delle opzioni disponibili.

Schema dei Kafka record

I record restituiti dal lettore Kafka Structured Streaming avranno lo schema seguente:

colonna Tipo
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

key e value vengono sempre deserializzati come matrici di byte con ByteArrayDeserializer. Usare operazioni dataframe (ad esempio cast("string") o from_avro) per deserializzare in modo esplicito le chiavi e i valori.

Scrivere dati in Kafka

Di seguito è riportato un esempio di scrittura di streaming in Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks supporta anche la semantica di scrittura batch nei sink di dati Kafka, come illustrato nell'esempio seguente:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Configurare lo scrittore di streaming strutturato Kafka

Importante

Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della libreria kafka-clients che abilita le scritture idempotenti per impostazione predefinita. Se un sink Kafka usa la versione 2.8.0 o successiva con ACL configurati, ma senza IDEMPOTENT_WRITE abilitato, la scrittura non riesce con il messaggio di errore org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Risolvere questo errore eseguendo l'aggiornamento a Kafka versione 2.8.0 o successiva oppure impostando .option(“kafka.enable.idempotence”, “false”) durante la configurazione del writer Structured Streaming.

Di seguito sono riportate le opzioni comuni configurate durante la scrittura su Kafka:

Opzione Valore Valore predefinito Descrizione
kafka.boostrap.servers Elenco delimitato da virgole di <host:port> none [Necessario] Configurazione Kafka bootstrap.servers.
topic STRING non impostato [Facoltativo] Imposta l'argomento per tutte le righe da scrivere. Questa opzione esegue l'override di qualsiasi colonna di argomento presente nei dati.
includeHeaders BOOLEAN false [Opzionale] Specifica se includere le intestazioni di Kafka nella riga.

Vedere la pagina Opzioni per l'elenco completo delle opzioni disponibili.

Schema per il writer Kafka

Quando si scrivono dati in Kafka, il dataframe fornito può includere i campi seguenti:

Nome della colonna Obbligatorio o facoltativo Tipo
key opzionale STRING oppure BINARY
value required STRING oppure BINARY
headers opzionale ARRAY
topic facoltativo (ignorato se topic è impostato come opzione di scrittura) STRING
partition opzionale INT

Autenticazione

Azure Databricks supporta più metodi di autenticazione per Kafka, tra cui le credenziali del servizio Unity Catalog, SASL/SSL e le opzioni specifiche del cloud per AWS MSK, Hub eventi di Azure e Google Cloud Managed Kafka. Vedere Autenticazione.

Ottenere le metriche di Kafka

È possibile monitorare il ritardo di una query di streaming rispetto a Kafka usando le avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest metriche. Questi report segnalano il ritardo medio, massimo e minimo di offset in tutte le partizioni di argomento sottoscritte, rispetto agli offset più recenti in Kafka. Vedere Lettura interattiva delle metriche.

Per stimare la quantità di dati che la query non ha ancora utilizzato, usare la estimatedTotalBytesBehindLatest metrica . Questa metrica stima il numero totale di byte rimanenti in tutte le partizioni sottoscritte in base ai batch elaborati negli ultimi 300 secondi. È possibile modificare l'intervallo di tempo usato per questa stima impostando l'opzione bytesEstimateWindowLength . Ad esempio, per impostarlo su 10 minuti:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Se stai eseguendo il flusso in un notebook, è possibile visualizzare queste metriche nella scheda Dati non elaborati nel dashboard dei progressi delle query di streaming.

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Per ulteriori informazioni, vedere Monitoraggio delle query Structured Streaming su Azure Databricks.

Esempio di codice: da Kafka a Delta

Nell'esempio seguente viene illustrato un flusso di lavoro completo per lo streaming continuo dei dati da Kafka a una tabella Delta. Questo modello è ideale per carichi di lavoro di inserimento dati quasi in tempo reale.

Questo esempio usa uno schema JSON fisso. Per altri formati come Avro o Protobuf, usare from_avro o from_protobuf. È anche possibile eseguire l'integrazione con un registro schemi. Consultare Esempio con Registro Schemi.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);