Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Esta página descreve como pode usar o Apache Kafka como fonte ou como sink ao executar cargas de trabalho de Structured Streaming no Azure Databricks.
Para mais informações sobre Kafka, consulte a documentação do Apache Kafka.
Ler dados de Kafka
Use o kafka formato para configurar ligações ao Kafka. Segue-se um exemplo de leitura em streaming:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
O Azure Databricks também suporta leituras em lote do Kafka, como no seguinte exemplo:
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Para carregamento incremental em lote, o Databricks recomenda o uso do Kafka com Trigger.AvailableNow. Ver AvailableNow: Processamento em lote incremental.
No Databricks Runtime 13.3 LTS e superiores, o Azure Databricks também fornece uma função SQL para ler dados Kafka. O streaming com SQL é suportado apenas no Lakeflow Spark Declarative Pipelines ou com tabelas de streaming no Databricks SQL. Consulte read_kafka função de valor de tabela.
Configurar o leitor de Streaming Estruturado do Kafka
Para consultas em lote e streaming, deve definir os servidores bootstrap para a fonte Kafka com a seguinte opção:
| Key | valor | Descrição |
|---|---|---|
kafka.bootstrap.servers |
Uma lista separada por vírgulas de host:port | Os servidores bootstrap do cluster Kafka |
Para definir tópicos de subscrição, deve especificar uma das seguintes opções:
| Option | valor | Descrição |
|---|---|---|
subscribe |
Uma lista de tópicos separados por vírgula. | A lista de tópicos para se inscrever. |
subscribePattern |
Cadeia de caracteres regex Java. | O padrão usado para se inscrever no(s) tópico(s). |
assign |
Cadeia de caracteres JSON {"topicA":[0,1],"topic":[2,4]}. |
Específico topicPartitions para consumir. |
Consulte a página de Opções para a lista completa de opções disponíveis.
Esquema das linhas de Kafka
O leitor Kafka Structured Streaming devolve as linhas com o seguinte esquema:
| Column | Tipo |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
O key e o value são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer. Use operações DataFrame (como cast("string") ou from_avro) para desserializar explicitamente as chaves e valores.
Gravar dados em Kafka
Segue-se um exemplo de uma gravação de streaming para Kafka:
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
O Azure Databricks também suporta semântica de escrita em lote para sumidouros de dados Kafka, como mostrado no seguinte exemplo:
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Configurar o gravador de Kafka Structured Streaming
Importante
O Databricks Runtime 13.3 LTS e superior inclui uma versão mais recente da kafka-clients biblioteca que permite gravações idempotentes por padrão. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state de erro.
Resolva esse erro atualizando para Kafka versão 2.8.0 ou superior, ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de Streaming Estruturado.
As seguintes são opções comuns para escritas em Kafka:
| Key | valor | Valor predefinido | Descrição |
|---|---|---|---|
kafka.boostrap.servers |
Uma lista separada por vírgulas de <host:port> |
nenhuma | Required. A configuração do Kafka bootstrap.servers. |
topic |
STRING |
não definido | Optional. Define o tema para todas as linhas a serem escritas. Esta opção substitui qualquer coluna de tópico que exista nos dados. |
includeHeaders |
BOOLEAN |
false |
Optional. Se os cabeçalhos de Kafka devem ser incluídos na linha. |
Consulte a página de Opções para a lista completa de opções disponíveis.
Esquema para escritor Kafka
Ao escrever dados para Kafka, o DataFrame fornecido pode incluir os seguintes campos:
| Nome da coluna | Obrigatório ou opcional | Tipo |
|---|---|---|
key |
optional |
STRING ou BINARY |
value |
required |
STRING ou BINARY |
headers |
optional | ARRAY |
topic |
opcional (ignorado se topic estiver definido como opção de gravador) |
STRING |
partition |
optional | INT |
Autenticação
O Azure Databricks suporta múltiplos métodos de autenticação para Kafka, incluindo credenciais de serviço do Unity Catalog, SASL/SSL e opções específicas da cloud para AWS MSK, Hubs de Eventos do Azure e Google Cloud Managed Kafka. Consulte Autenticação.
Recuperar métricas de Kafka
Para monitorizar o atraso em relação ao Kafka para uma consulta de streaming, utilize as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Estas métricas indicam o desfasamento de offset médio, máximo e mínimo em todas as partições dos tópicos subscritos, em relação aos offsets mais recentes no Kafka. Consulte Leitura de métricas interativamente.
Observação
No Databricks Runtime 17.1 e superiores, os offsets de Kafka mais recentes são obtidos após cada micro-lote concluído. Em tópicos que recebem dados continuamente, as métricas de backlog podem mostrar valores pequenos e persistentes não nulos. Este é um comportamento esperado e não indica que o fluxo esteja a ficar para trás.
No Databricks Runtime 17.0 e inferiores, os offsets Kafka mais recentes são obtidos no início do micro-batch. As métricas de backlog podem regressar 0 quando as consultas de streaming consomem consistentemente todos os registos disponíveis no início do micro-batch.
Para estimar os dados que faltam ler numa consulta, use a métrica estimatedTotalBytesBehindLatest. Esta métrica estima o número total de bytes restantes em todas as partições subscritas com base nos lotes processados nos últimos 300 segundos. Pode modificar a janela de tempo usada para esta estimativa definindo a bytesEstimateWindowLength opção.
Por exemplo, para definir a duração da janela para 10 minutos:
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Se estiver a executar o fluxo num notebook, pode ver estas métricas na guia Dados Brutos no painel de progresso da consulta de streaming.
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Consulte Monitorização de Consultas de Transmissão Estruturada no Azure Databricks para mais informações.
Exemplo de Kafka a Delta Lake
O exemplo seguinte mostra um fluxo de trabalho completo para transmitir continuamente dados de Kafka para uma tabela Delta Lake. Pode usar esta abordagem para cargas de trabalho de ingestão de dados quase em tempo real.
Este exemplo utiliza um esquema JSON fixo. Para outros formatos como Avro ou Protobuf, use from_avro ou from_protobuf. Também podes integrar com um registo de esquemas. Veja exemplo com o Registo de Esquemas.
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);