Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Este tutorial mostra como desenvolver e implantar seu primeiro pipeline ETL (extrair, transformar e carregar) para orquestração de dados com o Apache Spark. Embora este tutorial use computação multiuso do Databricks, você também pode usar a computação sem servidor se ela estiver habilitada para seu espaço de trabalho.
Você também pode usar Lakeflow Spark Declarative Pipelines para construir pipelines ETL. Databricks Lakeflow Spark Declarative Pipelines reduz a complexidade da construção, da implantação e da manutenção de pipelines de ETL em produção. Consulte o Tutorial: Como construir um pipeline ETL com Lakeflow Spark Declarative Pipelines.
Ao final deste artigo, você saberá como:
- Lançar um recurso de computação multiuso Databricks.
- Crie um caderno Databricks.
- Configure a ingestão incremental de dados para Delta Lake com o Auto Loader.
- Processar e interagir com os dados.
- Agende um bloco de notas como uma tarefa do Databricks.
Este tutorial utiliza cadernos interativos para completar tarefas comuns de ETL em Python ou Scala.
Você também pode usar o provedor Databricks Terraform para criar os recursos deste artigo. Consulte Criar clusters, blocos de anotações e trabalhos com o Terraform.
Requisitos
- Está iniciado sessão num espaço de trabalho do Azure Databricks.
- Tens permissão para criar um recurso de computação.
Nota
Se não tiver privilégios de controlo de computação, ainda pode completar a maioria dos passos abaixo, desde que tenha acesso a um recurso de computação.
Passo 1: Criar um recurso de computação
Para fazer análise exploratória de dados e engenharia de dados, crie um recurso de computação para executar comandos.
- Clique em
Calcular na barra lateral. - Na página de Computação, clique em Criar Computação.
- Especifique um nome único para o recurso de computação, deixe os valores restantes no seu estado padrão e clique em Criar cálculo.
Para saber mais sobre computação Databricks, consulte Computar.
Etapa 2: Criar um bloco de anotações Databricks
Para criar um bloco de notas na sua área de trabalho, clique
em Novo na barra lateral e, em seguida, clique em Bloco de Notas. Um bloco de anotações em branco é aberto no espaço de trabalho.
Para saber mais sobre como criar e gerir blocos de notas, consulte Gerir blocos de notas.
Etapa 3: Configurar o carregador automático para ingerir dados no Delta Lake
O Databricks recomenda o uso do Auto Loader para ingestão incremental de dados. O Auto Loader deteta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.
A Databricks recomenda o armazenamento de dados com o Delta Lake. Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e viabiliza o *data lakehouse*. Delta Lake é o formato padrão para tabelas criadas no Databricks.
Para configurar o Auto Loader para ingerir dados em uma tabela Delta Lake, copie e cole o seguinte código na célula vazia do seu bloco de anotações:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
linguagem de programação Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Nota
As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de conflito com ativos de espaço de trabalho existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do espaço de trabalho para solucionar essas restrições.
Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?.
Etapa 4: Processar e interagir com os dados
Os notebooks executam a lógica célula por célula. Para executar a lógica na sua célula:
Para executar a célula concluída na etapa anterior, selecione-a e pressione SHIFT+ENTER.
Para consultar a tabela que acabou de criar, copie e cole o código seguinte numa célula vazia e, em seguida, prima SHIFT+ENTER para executar a célula.
Python
df = spark.read.table(table_name)linguagem de programação Scala
val df = spark.read.table(table_name)Para visualizar os dados em seu DataFrame, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.
Python
display(df)linguagem de programação Scala
display(df)
Para saber mais sobre opções interativas para visualizar dados, consulte Visualizações em blocos de anotações Databricks e editor SQL.
Etapa 5: Agendar um trabalho
Pode executar notebooks do Databricks como scripts de produção, adicionando-os como uma tarefa num job do Databricks. Nesta etapa, você criará um novo trabalho que poderá ser acionado manualmente.
Para agendar o seu bloco de notas como uma tarefa:
- Clique em Agendar no lado direito da barra de cabeçalho.
- Insira um nome exclusivo para o nome do trabalho.
- Clique em Manual.
- No menu suspenso Compute, selecione o recurso de computação que criou no passo 1.
- Clique em Criar.
- Na janela apresentada, clique em Executar agora.
- Para ver os resultados da execução do trabalho, clique no
ícone ao lado do carimbo de data/hora da última execução .
Para obter mais informações sobre empregos, consulte O que são empregos?.
Integrações adicionais
Saiba mais sobre integrações e ferramentas para engenharia de dados com o Azure Databricks: