Partilhar via


Dados a fluir para uma casa de lago com o Spark

O streaming estruturado é um motor de processamento de fluxos escalável e tolerante a falhas, construído sobre o Spark. Trata um fluxo de dados em tempo real como uma tabela à qual novas linhas são continuamente adicionadas. O Structured Streaming suporta fontes de ficheiros integradas como CSV, JSON, ORC e Parquet, juntamente com serviços de mensagens como Kafka e Azure Event Hubs.

Este artigo aborda a configuração de uma fonte de streaming como os Azure Event Hubs, a ingestão de dados em streaming numa tabela Delta de Lakehouse, a otimização do desempenho de escrita com particionamento e batch de eventos, e a execução de trabalhos de streaming de forma fiável em produção.

Configurar uma fonte de streaming

Para transmitir dados para uma casa de lago, configura primeiro uma ligação à tua fonte de streaming. O Azure Event Hubs é uma escolha comum. Use o Azure Event Hubs Connector para o Apache Spark para ligar a sua aplicação Spark ao Azure Event Hubs.

Uma configuração básica de Event Hubs requer o nome do espaço de nomes dos Event Hubs, nome do hub, nome da chave de acesso partilhada e grupo de consumidores.

Um grupo de consumidores é uma visão de um hub de eventos inteiro. Os grupos de consumidores permitem que múltiplas aplicações consumidoras tenham uma visão separada do fluxo de eventos e leiam o fluxo de forma independente, ao seu próprio ritmo e com os seus próprios deslocamentos.

As partições nos Event Hubs permitem-lhe processar grandes volumes de eventos em paralelo. Um único processador tem uma capacidade limitada para lidar com eventos por segundo, enquanto múltiplos processadores podem trabalhar em paralelo entre partições.

Se forem usadas demasiadas partições com uma taxa de ingestão baixa, os leitores de partições lidam com uma pequena parte dos dados, causando um processamento não ótimo. O número ideal de partições depende da taxa de processamento desejada. À medida que aumentas o número de unidades de rendimento no teu namespace, podes querer partições extra para permitir que leitores concorrentes alcancem o seu rendimento máximo.

Testa o melhor número de partições para o teu cenário de throughput. Cenários com alto débito usam normalmente 32 ou mais partições.

Tabela Delta como destino de streaming

Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID (atomicidade, consistência, isolamento e durabilidade) em cima do armazenamento de data lake. Na Engenharia de Dados no Fabric, o Delta Lake suporta upserts, compactação de dados, viagem no tempo, evolução de esquemas e armazenamento em formato aberto.

Com delta como formato de saída, os dados em streaming writeStream fluem diretamente para uma tabela Delta. O exemplo seguinte lê-se a partir dos Event Hubs, analisa o corpo da mensagem e escreve numa tabela Delta:

import pyspark.sql.functions as f
from pyspark.sql.types import *

df = (
    spark.readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

Schema = StructType([
    StructField("<column_name_01>", StringType(), False),
    StructField("<column_name_02>", StringType(), False),
    StructField("<column_name_03>", DoubleType(), True),
    StructField("<column_name_04>", LongType(), True),
    StructField("<column_name_05>", LongType(), True),
])

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .toTable("deltaeventstable")
)

No código, format("delta") define Delta como formato de saída, outputMode("append") escreve apenas novas linhas na tabela e toTable("deltaeventstable") persiste os dados transmitidos para uma tabela Delta gerida.

Otimizar o desempenho do streaming

Quando a ingestão básica de streaming funcionar, pode melhorar o débito e a organização dos ficheiros com as técnicas de otimização das secções seguintes.

Particionar dados para gravação

Para otimizar o rendimento, particione os seus dados de forma eficaz. A particionação melhora tanto o débito de escrita como o desempenho das consultas subsequentes. Podes particionar os dados na memória, no disco, ou em ambos.

No disco — Use partitionBy() para organizar dados em subdiretórios com base nos valores das colunas. Escolha colunas com boa cardinalidade que produzam ficheiros de tamanho ideal. Evite colunas que criem partições muito pequenas ou demasiado poucas grandes.

Na memória — Usar repartition() ou coalesce() para distribuir dados entre os nós trabalhadores antes de escrever:

  • repartition() aumenta ou diminui as partições com um baralho completo, equilibrando os dados de forma equilibrada.
  • coalesce() apenas diminui as partições, minimizando o movimento de dados.

Combinar ambas as abordagens funciona bem para cenários de alto rendimento. O exemplo seguinte divide os dados em 48 partições na memória (correspondendo aos núcleos de CPU disponíveis) e depois particiona o disco por duas colunas:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .repartition(48)
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .toTable("deltaeventstable")
)

Use Escrita Otimizada

Como alternativa ao particionamento manual, a Escrita Otimizada funde ou divide as partições antes da escrita, maximizando o rendimento do disco sem as chamadas manuais repartition() ou coalesce(). Ative-o com uma configuração do Spark:

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)

Com a Escrita Otimizada ativada, podes remover repartition() ou coalesce() do teu código e deixar o Spark tratar do tamanho das partições. Ainda podes usar partitionBy() para organização ao nível do disco.

Eventos em lote com acionadores

Para otimizar ainda mais o desempenho da escrita, faça eventos em lote antes de os gravar no disco. Por defeito, o Spark processa cada microlote assim que o anterior termina. Definir um intervalo de ativação acumula dados ao longo de um período de tempo e escreve-os em operações maiores e menos frequentes. Lotes maiores produzem ficheiros Delta maiores e reduzem a sobrecarga dos ficheiros pequenos.

O seguinte exemplo processa eventos em intervalos de um minuto:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .trigger(processingTime="1 minute")
    .toTable("deltaeventstable")
)

Analise o volume de dados recebidos e escolha um intervalo de processamento que produza ficheiros Parquet de bom tamanho na tabela Delta.

Executar trabalhos de streaming em produção

Os cadernos Spark são uma ferramenta eficaz para desenvolver e testar lógica de streaming. No entanto, para cargas de trabalho de produção que têm de ser executadas continuamente, use as definições de trabalhos do Spark. As definições de funções Spark são tarefas não interativas, orientadas a código, que correm num cluster Spark e proporcionam maior robustez e disponibilidade.

A infraestrutura que executa um trabalho de streaming pode encontrar problemas que interrompam o trabalho, como falhas de hardware ou patches de infraestrutura. Uma política de nova tentativa reinicia automaticamente o trabalho quando este para inesperadamente. Configure a política de repetição numa definição de trabalho Spark para especificar quantas vezes o trabalho deve ser reiniciado (até repetições infinitas) e o intervalo de tempo entre as repetições. Com uma política de reintento ativada, o seu trabalho de streaming continua a executar até que o pare explicitamente.

O hub de monitorização Fabric inclui um separador de Streaming Estruturado com métricas como Taxa de Entrada, Taxa de Processo, Linhas de Entrada, Duração do Batch e Duração da Operação.