Inscrever-se no Google Pub/Sub

Use o conector incorporado para subscrever o Google Pub/Sub. Este conector tem semântica de processamento exatamente uma vez para linhas provenientes do subscritor.

Nota

Pub/Sub pode publicar linhas duplicadas, ou linhas podem chegar ao assinante fora de ordem. Tens de escrever código para lidar com linhas duplicadas e fora de ordem.

Configurar um stream Pub/Sub

O exemplo de código seguinte mostra como configurar uma leitura de Streaming Estruturado a partir de Pub/Sub e autenticar com chaves privadas.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Para obter mais opções de configuração, consulte Configurar opções de leitura de streaming do Pub/Sub.

Configurar o acesso a Pub/Sub

As suas credenciais devem ter as seguintes funções:

Funções Obrigatório ou opcional Como o papel é utilizado
roles/pubsub.viewer ou roles/viewer Necessário Verifica se existe subscrição e recebe subscrição.
roles/pubsub.subscriber Necessário Recolhe dados de uma subscrição.
roles/pubsub.editor ou roles/editor Opcional Permite a criação de uma subscrição caso não exista e permite o uso de deleteSubscriptionOnStreamStop para eliminar subscrições na terminação do fluxo.

O Databricks recomenda que uses segredos ao usar chaves. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Compreenda o esquema Pub/Sub

O esquema do fluxo corresponde às linhas obtidas a partir do Pub/Sub, conforme descrito na tabela seguinte:

Campo Tipo
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurar opções para leitura de streaming Pub/Sub

A tabela a seguir descreve as opções suportadas para Pub/Sub. Todas as opções estão configuradas no .option("<optionName>", "<optionValue>") seu leitor de stream.

Nota

Algumas opções de configuração Pub/Sub usam o conceito de buscas em vez de microlotes. Este é um detalhe interno de implementação, e as opções funcionam de forma semelhante a outros conectores de Structured Streaming, exceto que as linhas são recolhidas e depois processadas.

Key Valor predefinido Descrição
numFetchPartitions Defina como metade do número de executores presentes na inicialização do fluxo. O número de tarefas paralelas do Spark que vão buscar linhas de uma subscrição.
deleteSubscriptionOnStreamStop false Se true, a assinatura passada para o fluxo é excluída quando o trabalho de streaming termina.
maxBytesPerTrigger none Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado.
maxRecordsPerFetch 1000 O número de linhas a buscar por tarefa antes de processar as linhas.
maxFetchPeriod 10s A duração de tempo de que cada tarefa dispõe para recolher linhas antes de as processar. Aceita uma sequência de duração, por exemplo, 1s durante 1 segundo ou 1m 1 minuto. O Databricks recomenda o uso do valor padrão.

Use processamento incremental em lotes com Pub/Sub

Podes usar Trigger.AvailableNow para consumir linhas disponíveis das fontes Pub/Sub como um lote incremental.

O Azure Databricks registra o carimbo de data/hora quando você inicia uma leitura com a Trigger.AvailableNow configuração. As linhas processadas pelo lote incluem todos os dados obtidos anteriormente e quaisquer linhas publicadas entretanto com uma marca temporal inferior à marca temporal de início registada. Para mais informações, vejaAvailableNow: Processamento incremental em lote.

Monitorizar as métricas de streaming do Pub/Sub

As métricas de progresso do Structured Streaming reportam o número de linhas recolhidas e prontas a processar, o tamanho das linhas recolhidas e prontas a processar, e o número de duplicados vistos desde o início do fluxo.

Segue-se um exemplo de métricas Pub/Sub:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitações

Pub/Sub não suporta execução especulativa com spark.speculation.