Compartilhar via


Transmissão de dados para um lakehouse com Spark

O streaming estruturado é um mecanismo de processamento de fluxo escalonável e tolerante a falhas criado no Spark. Ele trata um fluxo de dados ao vivo como uma tabela à qual as novas linhas são acrescentadas continuamente. O Streaming Estruturado dá suporte a fontes de arquivo internas, como CSV, JSON, ORC e Parquet, juntamente com serviços de mensagens como Kafka e Hubs de Eventos do Azure.

Este artigo aborda a configuração de uma fonte de streaming, como Hubs de Eventos do Azure, a ingestão de dados de streaming em uma tabela Delta de lakehouse, a otimização do desempenho de gravação com particionamento e agrupamento de eventos em lote, e a execução de trabalhos de streaming de forma confiável em produção.

Configurar uma fonte de streaming

Para transmitir dados para um lakehouse, primeiro configure uma conexão com a sua fonte de streaming. Os Hubs de Eventos do Azure são uma opção comum. Use o Conector de Hubs de Eventos do Azure para Apache Spark para conectar seu aplicativo Spark aos Hubs de Eventos do Azure.

Uma configuração básica dos Hubs de Eventos requer o nome do namespace dos Hubs de Eventos, o nome do hub, o nome da chave de acesso compartilhado e o grupo de consumidores.

Um grupo de consumidores é uma exibição de um hub de eventos inteiro. Os grupos de consumidores permitem que vários aplicativos de consumo tenham uma exibição separada do fluxo de eventos e leiam o fluxo independentemente em seu próprio ritmo e com seus próprios deslocamentos.

As partições nos Hubs de Eventos permitem processar grandes volumes de eventos em paralelo. Um único processador tem uma capacidade limitada para lidar com eventos por segundo, enquanto vários processadores podem trabalhar em paralelo entre partições.

Se muitas partições forem usadas com uma taxa de ingestão baixa, os leitores de partição lidarão com uma pequena parte dos dados, causando processamento não otimizado. O número ideal de partições depende da taxa de processamento desejada. À medida que você aumenta o número de unidades de taxa de transferência em seu namespace, talvez você queira partições extras para permitir que leitores simultâneos atinjam sua taxa de transferência máxima.

Teste o melhor número de partições para seu cenário de taxa de transferência. Cenários com alta taxa de transferência geralmente usam 32 ou mais partições.

Tabela delta como um coletor de streaming

O Delta Lake é uma camada de armazenamento de software livre que fornece transações ACID (atomicidade, consistência, isolamento e durabilidade) sobre o armazenamento do data lake. Na Engenharia de Dados do Fabric, o Delta Lake dá suporte a upserts, compactação de dados, viagem no tempo, evolução do esquema e armazenamento em formato aberto.

Com o delta como formato de saída em writeStream, os dados de streaming fluem diretamente para uma tabela Delta. O exemplo a seguir lê dos Hubs de Eventos, analisa o corpo da mensagem e grava em uma tabela Delta:

import pyspark.sql.functions as f
from pyspark.sql.types import *

df = (
    spark.readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

Schema = StructType([
    StructField("<column_name_01>", StringType(), False),
    StructField("<column_name_02>", StringType(), False),
    StructField("<column_name_03>", DoubleType(), True),
    StructField("<column_name_04>", LongType(), True),
    StructField("<column_name_05>", LongType(), True),
])

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .toTable("deltaeventstable")
)

No código, format("delta") define Delta como o formato de saída, outputMode("append") grava apenas novas linhas na tabela e toTable("deltaeventstable") persiste os dados transmitidos para uma tabela Delta gerenciada.

Otimizar o desempenho do streaming

Depois que a ingestão básica de streaming funcionar, você poderá melhorar a taxa de transferência e a organização de arquivos com as técnicas de otimização nas seções a seguir.

Dados de partição para escritas

Para otimizar a taxa de transferência, particione seus dados com eficiência. O particionamento melhora a taxa de transferência de gravação e o desempenho da consulta downstream. Você pode particionar dados na memória, no disco ou em ambos.

Em disco – Use partitionBy() para organizar dados em subdiretórios com base em valores de coluna. Escolha colunas com boa cardinalidade que produzam arquivos de tamanho ideal. Evite colunas que criem muitas partições minúsculas ou poucas grandes.

Na memória — use repartition() ou coalesce() para distribuir dados entre nós de trabalho antes de escrever:

  • repartition() aumenta ou diminui as partições com um embaralhamento completo, equilibrando os dados de forma uniforme.
  • coalesce() diminui apenas as partições, minimizando a movimentação de dados.

A combinação de ambas as abordagens funciona bem para cenários de alta taxa de transferência. O exemplo a seguir divide os dados em 48 partições na memória (correspondendo aos núcleos de CPU disponíveis) e, em seguida, partições no disco por duas colunas:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .repartition(48)
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .toTable("deltaeventstable")
)

Usar gravação otimizada

Como alternativa ao particionamento manual, a Gravação Otimizada mescla ou divide partições antes de gravar, maximizando a taxa de transferência de disco sem chamadas manuais repartition()coalesce() . Habilite-o com uma configuração do Spark:

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)

Com a Gravação Otimizada habilitada, você pode remover repartition() ou coalesce() de seu código e permitir que o Spark manipule o dimensionamento da partição. Você ainda pode usar partitionBy() para a organização de nível de disco.

Eventos em lote com gatilhos

Para otimizar ainda mais o desempenho de gravação, agrupe os eventos em lotes antes de gravá-los em disco. Por padrão, o Spark processa cada microbatch assim que a anterior for concluída. A configuração de um intervalo de gatilho acumula dados durante um período de tempo e os grava em menos operações maiores. Lotes maiores produzem arquivos Delta maiores e reduzem a sobrecarga de arquivos pequenos.

O exemplo a seguir processa eventos em intervalos de um minuto:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .trigger(processingTime="1 minute")
    .toTable("deltaeventstable")
)

Analise o volume de dados de entrada e escolha um intervalo de processamento que produz arquivos Parquet bem dimensionados na tabela Delta.

Executar trabalhos de streaming em produção

Os notebooks Spark são uma ferramenta eficaz para desenvolver e testar a lógica de streaming. No entanto, para cargas de trabalho de produção que precisam ser executadas continuamente, use definições de trabalho do Spark. As definições de trabalho do Spark são tarefas não interativas orientadas a código que são executadas em um cluster Spark e fornecem maior robustez e disponibilidade.

A infraestrutura que executa um trabalho de streaming pode encontrar problemas que interrompem o trabalho, como falhas de hardware ou aplicação de patch de infraestrutura. Uma política de repetição reinicia automaticamente a tarefa quando ela é interrompida inesperadamente. Configure a política de repetição em uma definição de trabalho do Spark para especificar quantas vezes reiniciar o trabalho (até tentativas infinitas) e o intervalo de tempo entre novas tentativas. Com uma política de repetição habilitada, seu trabalho de streaming continuará em execução até que você a interrompa explicitamente.

O hub de monitoramento do Fabric inclui uma guia de Streaming Estruturado com métricas, incluindo Taxa de Entrada, Taxa de Processamento, Linhas de Entrada, Duração do Lote e Duração da Operação.