Partilhar via


FAQ

Perguntas frequentes sobre o uso do Kafka com o Azure Databricks.

Porque é que recebo um erro de que uma opção Kafka não é suportada ou não é reconhecida?

Um erro comum é esquecer o kafka. prefixo ao definir opções de configuração nativas Kafka. Todas as opções passadas diretamente ao cliente Kafka devem ser prefixadas com kafka.:

# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")

# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")

Opções específicas para o conector Spark Kafka (como subscribe, startingOffsets, maxOffsetsPerTrigger) não precisam do prefixo. Consulte Opções para a lista completa.

Porque é que recebo um erro sobre as classes de Kafka sombreadas?

Azure Databricks requer o uso de classes de Kafka sombreadas (prefixadas com kafkashaded. ou shadedmskiam.). Se vires erros como RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, deves usar os nomes das classes sombreadas:

  • org.apache.kafka.* As aulas exigem o kafkashaded. prefixo. Por exemplo: kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule
  • software.amazon.msk.* As aulas exigem o shadedmskiam. prefixo. Por exemplo: shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule

Porque é que estou a receber um TimeoutException ao ligar-me ao Kafka?

As causas comuns incluem:

  • Conectividade de rede: O cluster de computação não consegue alcançar os brokers Kafka. Verifique regras de firewall, grupos de segurança e configurações de VPC.
  • Servidores bootstrap errados: Verifique se o kafka.bootstrap.servers nome do host e a porta estão corretos.
  • Resolução DNS: Assegure que os nomes de host do broker Kafka possam ser resolvidos na rede Azure Databricks.
  • Questões de SSL/TLS: Se estiver a usar SSL, verifique se os certificados estão corretamente configurados.

Para configurações de peering Private Link ou VPC, certifique-se de que as rotas de rede corretas estão implementadas.

Devo usar o modo batch ou de streaming para o Kafka?

Depende do teu caso de uso:

  • Modo de streaming (spark.readStream): Use quando precisa de processamento contínuo de dados ou ingestão de baixa latência.
  • Modo batch (spark.read): Usado para cargas de dados únicas, backfills ou depuração. Requer tanto startingOffsets como endingOffsets.

Consulte Configurar intervalos de disparo de streaming estruturado para detalhes sobre a configuração de intervalos de disparo como AvailableNow, ProcessingTime, e modo em tempo real.

Posso ler vários tópicos de Kafka num único stream?

Sim, pode usar:

  • subscribe: Forneça uma lista separada por vírgulas de tópicos, por exemplo .option("subscribe", "topic1,topic2").
  • subscribePattern: Use um padrão de regex Java para corresponder aos nomes dos tópicos, por exemplo .option("subscribePattern", "topic-.*").

Como posso usar o Kafka com os Pipelines Declarativos Lakeflow Spark?

O Lakeflow Spark Declarative Pipelines fornece suporte nativo para fontes Kafka. Pode definir uma tabela de streaming que lê a partir do Kafka:

Python

import dlt

@dlt.table
def kafka_bronze():
  return (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:port>")
    .option("subscribe", "<topic>")
    .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>'
);

Consulte Carregar dados em pipelines para mais detalhes sobre fontes de streaming nos Lakeflow Spark Declarative Pipelines.

Como posso desserializar as colunas de chave e valor de Kafka?

As key colunas e value são devolvidas como binárias (BINARY tipo ). Use as operações do DataFrame para deserializar os dados com base no seu formato.

Porque é que estou a receber um erro de escrita idempotente?

O Databricks Runtime 13.3 LTS e superior inclui uma versão mais recente da kafka-clients biblioteca que permite gravações idempotentes por padrão. Se o seu cluster Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas mas sem IDEMPOTENT_WRITE ativadas, a escrita falha com: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolva esse erro atualizando para Kafka versão 2.8.0 ou superior, ou definindo .option("kafka.enable.idempotence", "false") ao configurar seu gravador de Streaming Estruturado.

O que é KAFKA_DATA_LOSS_ERROR e como é que resolvo isto?

Este erro ocorre quando a origem do Kafka deteta que os offsets armazenados no ponto de verificação já não estão disponíveis no Kafka, normalmente porque:

  • O stream foi suspenso por mais tempo do que o período de retenção de Kafka.
  • Os dados do tema Kafka foram apagados ou o tema foi recriado.
  • O corretor Kafka sofreu perda de dados.

Para resolver:

  • Se a perda de dados for aceitável: Defina .option("failOnDataLoss", "false") para permitir que o fluxo de dados continue a partir do ponto de deslocamento mais antigo disponível.
  • Se a perda de dados não for aceitável: Redefinir o ponto de verificação e reprocessar a partir dos offsets earliest, ou restaurar os dados de Kafka ausentes.

Consulte KAFKA_DATA_LOSS condição de erro para mais informações.

Como posso controlar a taxa a que os dados são lidos pelo Kafka?

Utilize a opção maxOffsetsPerTrigger para limitar o número de offsets (aproximadamente o número de registos) processados por micro-lote. Isto ajuda a evitar grandes lotes que possam sobrecarregar o processamento a jusante ou causar problemas de memória ao recuperar um atraso.

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>',
  maxOffsetsPerTrigger => '10000'
);

Alternativamente, use opções como minPartitions ou maxRecordsPerPartition para controlar quantas partições Spark são criadas para cada lote.

Como posso monitorizar o atraso do meu stream em relação aos offsets mais recentes do Kafka?

Use as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest disponíveis no progresso da consulta em streaming. Estes relatórios indicam quantos deslocamentos o seu fluxo está atrasado em relação ao último deslocamento disponível, em todas as partições de tópicos subscritas. Consulte Monitorização de consultas de streaming estruturado no Azure Databricks.

Também pode usar estimatedTotalBytesBehindLatest para estimar o total de bytes de dados que ainda não foram processados.

Porque é que as minhas métricas de offset de lag do Kafka mostram valores persistentemente não nulos depois de atualizar para o Databricks Runtime 17.1?

No Databricks Runtime 17.1 e superiores, os offsets de Kafka mais recentes são obtidos após cada micro-lote concluído. Em tópicos que recebem dados continuamente, as métricas de backlog podem mostrar valores pequenos e persistentes não nulos. Este é um comportamento esperado e não indica que o fluxo esteja a ficar para trás.

No Databricks Runtime 17.0 e inferiores, os offsets Kafka mais recentes são obtidos no início do micro-batch. As métricas de backlog podem regressar 0 quando as consultas de streaming consomem consistentemente todos os registos disponíveis no início do micro-batch.

Se os valores forem grandes ou crescerem continuamente, o fluxo pode não estar a acompanhar os dados recebidos. Consulte Monitorização de consultas de streaming estruturado no Azure Databricks.

Porque é que a inicialização do meu stream Kafka é lenta?

Fluxos de Kafka requerem tempo para:

  1. Ligue-se ao cluster Kafka e obtenha metadados.
  2. Descubra partições de tópicos.
  3. Buscar os offsets iniciais.

Para clusters Kafka on-premises ou remotos, a latência da rede pode impactar significativamente o tempo de inicialização. Se estiver a executar pipelines acionados/agendados com reinicializações frequentes, considere usar o modo de streaming contínuo para evitar sobrecarga de inicialização repetida.

Porque é que adicionar mais executores Spark não aumenta o meu débito Kafka?

Quando os corretores Kafka ficam saturados, adicionar mais executores Spark aumenta o custo sem aumentar o throughput.

Sinais de que Kafka é o gargalo:

  • O rendimento estagna apesar de adicionar mais núcleos.
  • A utilização do CPU ou da rede do broker Kafka é elevada.
  • As tarefas do Spark são concluídas rapidamente, mas aguardam novos dados.

Para resolver isto, escale o seu cluster Kafka adicionando brokers ou aumentando o número de partições para distribuir a carga.

Como posso otimizar o custo e a utilização computacional do streaming Kafka?

Para os modos micro-batch e AvailableNow:

  • Ajuste o tamanho correto do seu cluster: Monitorize métricas e defina um tamanho fixo de cluster apropriado para a carga máxima.
  • Usar maxOffsetsPerTrigger: Limitar o tamanho dos lotes para controlar o uso de recursos durante picos de carga.
  • Evite o autoescalonamento: Os trabalhos de streaming funcionam continuamente, e adicionar ou remover nós causa sobrecarga de reequilíbrio de tarefas.
  • Reduzir o desvio de dados: Partições desajustadas fazem com que algumas tarefas processem significativamente mais dados do que outras, levando a retardos que atrasam a conclusão geral do lote e desperdiçam recursos computacionais em tarefas ociosas. Use a minPartitions opção de dividir grandes partições Kafka em partições Spark mais pequenas para um processamento mais equilibrado.

Para o modo em tempo real, o dimensionamento correto é especialmente importante porque as tarefas podem permanecer inativas enquanto aguardam dados. Considerações-chave:

  • Define maxPartitions para que cada tarefa gere múltiplas partições Kafka para reduzir a sobrecarga.
  • Sintonize spark.sql.shuffle.partitions para trabalhos com muita aleatoriedade.

Consulte Compute sizing para orientações sobre o dimensionamento de clusters para o modo em tempo real.

Porque é que o meu stream não está a devolver registos, mesmo que haja dados no tópico?

As causas comuns incluem:

  • Configuração erradastartingOffsets: O valor padrão é latest, que só lê os dados novos que chegam após o início do fluxo. Defina startingOffsets como earliest para ler os dados existentes.
  • Nome do tema errado: Verifique se está a subscrever o tema correto.
  • Problemas de autenticação: O seu fluxo pode ter-se ligado com sucesso, mas não tem permissões para ler do tópico. Verifique as suas ACLs do Kafka.
  • Expiração do deslocamento: Se o seu fluxo foi interrompido por um período prolongado e os deslocamentos no ponto de controlo expiraram (foram eliminados pela retenção do Kafka), pode ser necessário reiniciar o ponto de controlo ou ajustar as configurações correspondentes failOnDataLoss.