Fluxo do Apache Pulsar

Importante

Esta funcionalidade está em Pré-visualização Pública.

No Databricks Runtime 14.1 e superior, você pode usar o Streaming Estruturado para transmitir dados do Apache Pulsar no Azure Databricks.

O Streaming Estruturado fornece semântica de processamento exatamente uma vez para dados lidos de fontes do Pulsar.

Exemplo de sintaxe

A seguir está um exemplo básico do uso do Structured Streaming para ler do Pulsar:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Para ler tópicos do Pulsar, deve fornecer uma service.url e uma das seguintes opções:

  • topic
  • topics
  • topicsPattern

Para obter uma lista completa de opções, consulte Configurar opções para a transmissão em fluxo do Pulsar.

Autenticar-se no Pulsar

O Azure Databricks suporta autenticação com truststore e keystore no Pulsar. O Databricks recomenda que use segredos para armazenar detalhes de configuração.

As opções de configuração de stream disponíveis incluem as seguintes:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Se o fluxo usar um PulsarAdmin, deve definir as seguintes opções:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

O exemplo a seguir demonstra a configuração de opções de autenticação:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Esquema pulsar

Quando se lê do Pulsar, o esquema das linhas depende dos esquemas dos tópicos da fonte.

  • Para tópicos com esquema Avro ou JSON, os nomes e tipos de campo são preservados no Spark DataFrame resultante.
  • Para tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada numa coluna value.
  • Se configurares o stream para ler vários tópicos com esquemas diferentes, define allowDifferentTopicSchemas para carregar o conteúdo bruto numa value coluna.

Os registros Pulsar têm os seguintes campos de metadados:

Coluna Tipo
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurar opções para leitura de streaming do Pulsar

Configure todas as opções seguintes com a sintaxe .option("<optionName>", "<optionValue>") para fluxos de leitura. Também pode configurar a autenticação usando .options(). Consulte Autenticar-se no Pulsar.

A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar apenas uma das opções topicou topicstopicsPattern.

Opção Valor predefinido Descrição
service.url nenhum A configuração do Pulsar serviceUrl para o serviço Pulsar.
topic nenhum Uma sequência de caracteres com o nome do tópico a consumir.
topics nenhum Uma lista separada por vírgulas dos tópicos a consumir.
topicsPattern nenhum Uma string de regex Java para corresponder em tópicos a serem consumidos.

A tabela a seguir descreve outras opções suportadas para o Pulsar:

Opção Valor predefinido Descrição
predefinedSubscription nenhum O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo spark.
subscriptionPrefix nenhum Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso da aplicação Spark.
pollTimeoutMs 120000 O tempo limite para ler mensagens do Pulsar em milissegundos.
waitingForNonExistedTopic false Se o conector deve esperar até que os tópicos desejados sejam criados.
failOnDataLoss true Controla se uma consulta deve ser falhada quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção).
allowDifferentTopicSchemas false Se forem lidos vários tópicos com esquemas diferentes, use esta opção para desativar a desserialização automática dos valores dos tópicos com base no esquema. Somente os valores brutos são retornados quando isso é true.
startingOffsets latest Se latest, o leitor lê os registos mais recentes depois de iniciar a execução. Se earliest, o leitor lê desde o primeiro offset. Também podes especificar uma string JSON para um deslocamento específico.
maxBytesPerTrigger nenhum Um limite suave para o número máximo de bytes a processar por micro-lote. Se especificar esta opção, também deve especificar admin.url.
admin.url nenhum A configuração do Pulsar serviceHttpUrl. É obrigatório quando maxBytesPerTrigger é especificado.

Você também pode especificar qualquer configuração de cliente, administrador e leitor do Pulsar usando os seguintes padrões:

Padrão Opções de configuração
pulsar.client.* Configuração do cliente Pulsar
pulsar.admin.* Configuração de administração do Pulsar
pulsar.reader.* Configuração do leitor de pulsar

Construir deslocamentos iniciais JSON

Para usar um ID de mensagem personalizado que especifique um deslocamento, como JSON, com a startingOffsets opção, veja o seguinte exemplo:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()