Gestor eficiente de redução e redistribuição remota

Aplica-se a:✅ Engenharia de Dados de Tecido e Ciência de Dados

A redução de escala eficiente é uma funcionalidade do Microsoft Fabric Spark que desassocia os dados de redistribuição do Spark do tempo de vida do executor. Em vez de fixar os resultados do shuffle nos discos locais dos executores, o Fabric Spark encaminha os dados de shuffle para o Armazenamento de Blobs do Azure (ou migra-os para aí, quando necessário) e permite que a Execução Adaptativa de Consultas (AQE) determine a própria escrita. O resultado é uma redução mais rápida do cluster, menor custo de computação e trabalhos mais resilientes — sem alterações nas suas consultas, cadernos ou pipelines.

Overview

A redução eficiente é construída a partir de quatro capacidades cooperantes:

Capacidade O que faz
Gestor de Embaralhamento Remoto (RSM) Escreve e lê dados de mistura no Armazenamento de Blobs do Azure em vez de discos locais dos executores.
Migração de Baralhamento Move os blocos de shuffle de um executor antes de este ser descomissionado, em vez de os eliminar.
Camada de Decisão Encaminhamento dinâmico em tempo de execução por estágio que mantém os shuffles pequenos locais e desloca os shuffles grandes para armazenamento remoto.
Escrita Aleatória AQE Permite que a Execução Adaptativa de Consultas participe na fase de escrita de shuffle para que o particionamento fique correto logo à primeira.

Pré-requisitos

  • O Native Execution Engine (NEE) deve estar ativado.
  • Escalabilidade automática ativada (recomendada). A redução eficiente da escala também funciona sem dimensionamento automático com as configurações do Spark abaixo.
  • Runtime 1.3 (Apache Spark 3.5) ou posterior.

Como funciona

Quando o Spark processa uma consulta, frequentemente redistribui dados entre fases—um shuffle. Normalmente, os dados de shuffle são armazenados no disco local de cada executor, o que vincula os executores a esses dados. Não podem ser lançados até que todos os consumidores tenham terminado de ler. Esse acoplamento é a principal razão pela qual os clusters não conseguem reduzir a escala rapidamente e pela qual a perda de um executor provoca novas tentativas dispendiosas da fase.

Uma redução eficiente quebra este acoplamento:

  • Large shuffles vão diretamente para Armazenamento de Blobs do Azure através do Remote Shuffle Manager.
  • Pequenas reorganizações mantêm-se no disco local por uma questão de rapidez. Se o respetivo executor tiver de ser desativado mais tarde, o Shuffle Migration move os blocos, em segundo plano, para nós pares ou para armazenamento de contingência.
  • A Camada de Decisão escolhe o caminho certo por estágio em tempo de execução.
  • AQE Shuffle Write garante que o processo de escrita gere o particionamento que o AQE subsequente consome sem voltar a agregá-lo, evitando operações de E/S desnecessárias.
                ┌───────────────────────────┐
   Query  ───►  │   AQE + Decision Layer    │   per-stage choice
                └─────────────┬─────────────┘
                              │
                ┌─────────────▼─────────────┐
                │   AQE Shuffle Write       │   partition-aware writer
                └─────┬─────────────────┬───┘
                      │                 │
              local   ▼                 ▼   remote
        ┌────────────────────┐   ┌──────────────────┐
        │  Local disk +      │   │  RSM → Azure     │
        │  Shuffle Migration │   │  Blob Storage    │
        └─────────┬──────────┘   └─────────┬────────┘
                  │ on decommission        │
                  ▼                        ▼
        fallback storage   Remote shuffle store

Roteamento inteligente (Camada de Decisão)

A Camada de Decisão avalia cada permutação de dados e decide:

  • Grandes redistribuições → Armazenamento de Blobs do Azure. Máximo benefício de redução de escala e tolerância a falhas.
  • Pequenas redistribuições → disco local. Sem sobrecarga de E/S na cloud para transferências muito pequenas. Se o executor for posteriormente desativado, a Migração Aleatória assume o controlo.

O encaminhamento é automático e não requer intervenção do utilizador. A granularidade recomendada é por fase.

Principais benefícios

Custos mais baixos: Pague apenas pelo cálculo que utiliza

Com uma redução eficiente de escala, os executores são libertados assim que o seu trabalho está concluído. Já não permanecem inativos a armazenar dados de shuffle que as tarefas subsequentes poderão vir a ler.

  • Redimensionamento descendente mais rápido. O escalonamento automático remove imediatamente os nós após a conclusão de uma tarefa.
  • Menos computação ociosa. Nenhum executor “zumbi” mantido ativo apenas para disponibilizar o shuffle local.
  • Sem sobreaprovisionamento do disco. As redistribuições de dados de grande dimensão são enviadas para armazenamento de blobs em vez de exigirem discos locais de grande capacidade.
  • Custo de armazenamento limitado. O armazenamento de reserva é limpo automaticamente quando os blocos deixam de ser necessários.

Empregos mais resilientes

Quando os dados de reorganização residem apenas no disco local, uma falha do executor significa que esses dados se perdem e o Spark tem de os recalcular. Com uma redução eficiente da escala, os dados já estão no armazenamento de blobs ou são migrados para lá antes de o executor ser removido.

Scenario Sem uma redução de escala eficiente Com redução de escala eficiente
Falhas do Executor Dados de redistribuição perdidos; etapas reexecutadas Os dados estão seguros em armazenamento; sem recomputação
Preempção de nós Perda de dados, novas tentativas dispendiosas Os dados sobrevivem; O trabalho continua normalmente
Descomissionamento gracioso O modo aleatório foi desativado ao encerrar Blocos migrados para armazenamento de par ou de contingência
Blips de rede durante o fetch Em cascata FetchFailedException As leituras são feitas a partir do armazenamento, sem serem afetadas

Isto elimina a causa mais comum de FetchFailedException em produção.

Escalabilidade mais rápida e verdadeiramente elástica

Sem uma redução eficiente da escala, o autoescalador não consegue recuperar um nó enquanto qualquer executor nele ainda mantém dados de embaralhamento ou dados em cache. A redução eficiente desacopla ambos:

  • Os dados de shuffle estão no armazenamento de blobs (ou são migrados para lá ao encerrar).
  • A cache já não fixa executores. As caches reproduzíveis, como a cache de instantâneos Delta, estão excluídas da proteção contra redução de escala.

O autoescalador pode remover livremente nós inativos e redimensionar o cluster em resposta a alterações na carga de trabalho.

Melhor desempenho em shuffles grandes e desbalanceados

O AQE Shuffle Write permite que a Execução Adaptativa de Consultas modele a própria escrita de shuffle, escolhendo o particionamento que o AQE a jusante consome sem necessidade de nova coalescência e produzindo menos blocos, com dimensões mais adequadas, para armazenamento remoto. Em combinação com a Camada de Decisão, obtém-se um tempo real de execução mais baixo em consultas grandes ou desequilibradas e uma latência inalterada nas pequenas.

Introdução

Aplique esta configuração para ativar o conjunto completo e eficiente de redução:

# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")

# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")

# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")

# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")

Não são necessárias alterações de código. Também pode defini-las nas propriedades do Spark do seu ambiente.

Referência de configuração

Gestor de Embaralhamento Remoto (RSM)

Configuração Recomendado O que controla
spark.remote.shuffle.enabled true Ativa a redução eficiente da escala. Os dados de shuffle são enviados para o Armazenamento de Blobs do Azure em vez dos discos locais dos executores.

Camada de Decisão

Configuração Recomendado O que controla
spark.sql.rsm.decisionlayer.enabled.level stage Granularidade à qual a Camada de Decisão aplica o baralhamento de rotas. stage avalia cada estágio Spark de forma independente.

Escrita Aleatória AQE

Configuração Recomendado O que controla
spark.sql.adaptive.shuffleWrite.enabled true Permite que o AQE participe na fase de escrita de shuffle. Produz um particionamento que o AQE subsequente consome sem necessidade de nova coalescência.

Note

O próprio AQE (spark.sql.adaptive.enabled) tem de estar ligado. Está ligado por defeito no Fabric Spark.

Migração do Shuffle na desativação

Configuração Recomendado O que controla
spark.storage.decommission.shuffleBlocks.enabled true Migra blocos de shuffle de um executor que está a ser desativado, em vez de os descartar.
spark.storage.decommission.shuffleBlocks.cleanup true Limpa os blocos de shuffle do executor de origem após uma migração bem-sucedida.
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage true Se nenhum executor par conseguir aceitar os blocos, migra-os para armazenamento de contingência (Armazenamento de Blobs do Azure).
spark.storage.decommission.fallbackStorage.cleanUp true Remove os blocos aleatórios do armazenamento de contingência assim que deixam de ser necessários, limitando os custos de armazenamento.

Alocação dinâmica consciente da cache

Configuração Recomendado O que controla
spark.dynamicAllocation.preventShutdownExecutorWithCache false Permite a alocação dinâmica para libertar executores mesmo quando estes detêm blocos em cache.
spark.dynamicAllocation.excludeDeltaSnapshotCache true Ignora a cache snapshot Delta ao decidir se um executor ainda mantém cache útil. A cache de instantâneos delta é reprodutível e não deve impedir a redução de escala.

Ajuste avançado (RSM)

A maioria dos utilizadores não precisa de alterar estes predefinidos.

Desempenho de escrita

Configuração Predefinição O que controla
spark.remote.shuffle.partition.buffersize 16777216 (16 MB) Buffer por partição antes de escrever no armazenamento.
spark.remote.shuffle.blocksize 8388608 (8 MB) Tamanho dos blocos individuais carregados no Armazenamento de Blobs.
spark.remote.shuffle.write.maxthreads cores × 16 Número máximo de threads utilizadas para escrever dados de mistura.
spark.remote.shuffle.write.maxtasks 16384 Número máximo de operações de escrita concorrentes.

Desempenho de leitura

Configuração Predefinição O que controla
spark.remote.shuffle.read.parallel.enabled true Fluxos de download paralelo para leituras aleatórias.
spark.remote.shuffle.read.parallelism 4 Fluxos de download paralelos por tarefa.
spark.remote.shuffle.read.prefetchqueuesize 250 Profundidade da fila de pré-leitura durante a leitura.
spark.remote.shuffle.read.maxthreads cores × 4 Número máximo de threads utilizadas para a leitura.

Reliability

Configuração Predefinição O que controla
spark.remote.shuffle.retries 5 Repetir as tentativas em erros transitórios de armazenamento.
spark.remote.shuffle.retrydelayms 800 Intervalo de espera inicial entre tentativas.
spark.remote.shuffle.retrymaxdelayms 60000 Tampa de recuo.

Compression

Configuração Predefinição O que controla
spark.remote.shuffle.compression Utilizações spark.io.compression.codec Formato de compressão para dados de shuffle remoto (por exemplo, lz4, zstd).

Resultados de desempenho

Gráfico que mostra a poupança de custos com a redução eficiente ativada versus desativada num benchmark TPC-DS, demonstrando uma redução de custos de 54%.

Poupança nos custos de computação (benchmark TPC-DS)

Métrico Sem uma redução de escala eficiente Com redução de escala eficiente
Computação Total (VM-Minutes) 14,952 6,880
Redução de custos 54%

O tempo total de execução da tarefa pode ser maior (o dimensionamento automático utiliza menos executores em simultâneo), mas a capacidade de computação faturada é reduzida em mais de metade.

Desempenho da camada de decisão (TPC-DS, RSM ativado)

Direcionar pequenas operações de redistribuição para o disco local e apenas as grandes operações de redistribuição para armazenamento remoto proporciona até 57% de melhoria no tempo de execução face ao direcionamento de todas as operações de redistribuição para armazenamento remoto, com a mesma vantagem em termos de redução de escala.

Limitações

  • NEE obrigatório. A redução eficiente depende do Native Execution Engine.
  • Apenas Armazenamento de Blobs do Azure. Padrão BlockBlobStorage com HNS desativado. As contas do Azure Data Lake Gen2 / com HNS ativado não são suportadas como armazenamento remoto de shuffle.
  • Não é suportado com o Azure Private Link. Ambientes que usam rede de ligação privada não são atualmente compatíveis.
  • A granularidade da Camada de Decisão é atualmente por estágio. O encaminhamento por tarefa ou por partição não faz parte do âmbito.
  • Mudança no comportamento da cache. Com preventShutdownExecutorWithCache=false, os executores na posse de dados cache()/persist() podem ser reduzidos. Cargas de trabalho que dependem fortemente da cache local do executor para dados acedidos com frequência devem ser validadas.