Opções

Esta página descreve opções de configuração para ler e escrever no Apache Kafka usando Structured Streaming no Azure Databricks.

O conector Kafka do Azure Databricks é construído sobre o conector Apache Spark Kafka e suporta todas as opções de configuração padrão do Kafka. Qualquer opção com prefixo é kafka. passada diretamente para o cliente Kafka subjacente. Por exemplo, .option("kafka.max.poll.records", "500") define a propriedade do max.poll.records consumidor de Kafka. Consulte a documentação de configuração de Kafka para a lista completa de propriedades Kafka disponíveis.

Para uma lista completa de opções de fonte e sink de Streaming Estruturado, consulte Kafka e o Guia de Integração Structured Streaming + Kafka.

Opções necessárias

Para detalhes sobre as opções necessárias, consulte Kafka.

A seguinte opção é necessária tanto para leitura como para escrita:

Key Descrição
kafka.bootstrap.servers Uma lista separada por vírgulas de endereços host:port para corretores Kafka. Define a propriedade do bootstrap.servers cliente Kafka.
Se encontrar que não há dados da Kafka, verifique esta lista de endereços do corretor para moradas incorretas. Se a lista de endereços do corretor estiver incorreta, pode não haver erros. Os clientes Kafka assumem que os corretores estarão disponíveis eventualmente e tentam novamente para sempre quando recebem erros de rede.

Para leituras de Kafka, deve também especificar exatamente uma das seguintes opções para identificar quais os tópicos a seguir:

  • subscribe
  • subscribePattern
  • assign

Ao escrever para Kafka, pode opcionalmente definir a topic opção para especificar um tópico de destino para todas as linhas. Se não estiver definido, o DataFrame deve incluir uma topic coluna.

Opções de leitores comuns

As seguintes opções são frequentemente usadas ao ler Kafka:

Key Descrição
minPartitions O número mínimo de partições a ler do Kafka.
maxRecordsPerPartition O número máximo de registos por partição Spark.
failOnDataLoss Se deve falhar a consulta quando é possível que os dados tenham sido perdidos.
maxOffsetsPerTrigger O número máximo de deslocamentos processados por intervalo de gatilho.
startingOffsets O deslocamento a partir do qual a consulta inicia a leitura.
endingOffsets Onde parar de ler para consultas em lote.
groupIdPrefix O prefixo personalizado para o ID de grupo de consumidores gerado automaticamente.
kafka.group.id O ID de grupo para usar enquanto lês do Kafka.
Use isto com cautela, pois pode causar comportamentos inesperados. Por padrão, cada consulta gera um ID de grupo exclusivo para leitura de dados. Isto garante que cada consulta tem o seu próprio grupo de consumidores que evita interferências de outros consumidores e permite que cada consulta leia todas as partições dos seus tópicos subscritos. Em alguns cenários, como a autorização baseada em grupos Kafka, pode usar IDs de grupo autorizados específicos para ler dados.
Consultas com o mesmo ID de grupo podem interferir entre si e ler apenas dados parciais. A interferência pode ocorrer quando executa cargas de trabalho em lote e streaming simultâneas, ou quando inicia e reinicia consultas em rápida sucessão.
Para minimizar problemas, defina a configuração session.timeout.ms Kafka para consumidores como muito pequena.
includeHeaders Se deve incluir cabeçalhos de mensagens Kafka na saída.
bytesEstimateWindowLength A janela de tempo usada para estimar bytes restantes através da estimatedTotalBytesBehindLatest métrica.

Para uma lista completa de opções de fonte e sink de Streaming Estruturado, consulte Kafka e o Guia de Integração Structured Streaming + Kafka.

Opções comuns de escritor

As seguintes opções são frequentemente usadas ao escrever para Kafka:

Key Descrição
topic Define o tema para todas as linhas. Isto tem precedência sobre qualquer topic coluna dos dados.
includeHeaders Se deve incluir cabeceamentos de Kafka na fila.

Importante

O Databricks Runtime 13.3 LTS e superior inclui uma versão mais recente da kafka-clients biblioteca que permite gravações idempotentes por padrão. Se o seu sink Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas mas sem IDEMPOTENT_WRITE ativadas, as escritas falharão. Resolva isto atualizando para Kafka 2.8.0 ou superior, ou definindo .option("kafka.enable.idempotence", "false").

Para uma lista completa de opções de fonte e sink de Streaming Estruturado, consulte Kafka e o Guia de Integração Structured Streaming + Kafka.

Opções de autenticação

O Azure Databricks suporta múltiplos métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog, SASL/SSL e opções específicas da cloud para AWS MSK, Hubs de Eventos do Azure e Google Cloud Managed Kafka.

O Azure Databricks recomenda que utilize credenciais de serviço do Catálogo Unity para autenticação a serviços Kafka geridos na cloud:

Option Descrição
databricks.serviceCredential O nome de uma credencial de serviço do Unity Catalog para autenticação a serviços Kafka geridos na cloud (AWS MSK, Hubs de Eventos do Azure ou Google Cloud Managed Kafka). Disponível no Databricks Runtime 16.1 e superior.
databricks.serviceCredential.scope O âmbito OAuth para a credencial de serviço. Só defina isto se o Azure Databricks não conseguir inferir automaticamente o âmbito do seu serviço Kafka.

Quando utiliza uma credencial de serviço do Unity Catalog, não precisa de especificar opções SASL/SSL como kafka.sasl.mechanism, kafka.sasl.jaas.config, ou kafka.security.protocol.

Opções comuns de SASL/SSL incluem:

Option Descrição
kafka.security.protocol O protocolo usado para comunicar com corretores (por exemplo, SASL_SSL, SSL, PLAINTEXT).
kafka.sasl.mechanism O mecanismo SASL (por exemplo, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM).
kafka.sasl.jaas.config A cadeia de configuração de login do JAAS.
kafka.sasl.login.callback.handler.class O nome da classe totalmente qualificado de um handler de retorno de login para autenticação SASL.
kafka.sasl.client.callback.handler.class O nome da classe totalmente qualificado de um handler de callback de cliente para autenticação SASL.
kafka.ssl.truststore.location A localização do ficheiro de armazenamento de confiança SSL.
kafka.ssl.truststore.password A palavra-passe do ficheiro de armazenamento de confiança SSL.
kafka.ssl.keystore.location A localização do ficheiro de armazenamento da chave SSL.
kafka.ssl.keystore.password A senha para o arquivo de armazenamento de chaves SSL.

Para instruções completas de configuração da autenticação, consulte Autenticação.

Recursos adicionais