Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este tutorial se muestra cómo desarrollar e implementar la primera canalización de ETL (extracción, transformación y carga) para la orquestación de datos con Apache Spark. Aunque en este tutorial se utilice el cómputo de propósito general de Databricks, también puede usar el cómputo sin servidor si está habilitado para su espacio de trabajo.
También puede usar canalizaciones declarativas de Spark de Lakeflow para compilar canalizaciones ETL. Las canalizaciones declarativas de Spark de Databricks Lakeflow reducen la complejidad de crear, implementar y mantener canalizaciones ETL de producción. Consulte Tutorial: Construcción de una canalización ETL con Lakeflow Spark Declarative Pipelines.
Al final de este artículo, sabrá cómo:
- Inicie un recurso informático de propósito general de Databricks.
- Cree un cuaderno de Databricks.
- Configurar la ingesta incremental de datos en Delta Lake con Auto Loader.
- Procesar e interactuar con los datos.
- Programe un cuaderno como un trabajo de Databricks.
En este tutorial se usan cuadernos interactivos para completar tareas ETL comunes en Python o Scala.
También puede usar el proveedor de Terraform de Databricks para crear los recursos mencionados en este artículo. Consulte Creación de clústeres, cuadernos y trabajos con Terraform.
Requisitos
- Ha iniciado sesión en un área de trabajo de Azure Databricks.
- Tiene permiso para crear un recurso informático.
Nota:
Si no tiene privilegios de control de proceso, puede completar la mayoría de los pasos siguientes siempre y cuando tenga acceso a un recurso de proceso.
Paso 1: Creación de un recurso de proceso
Para realizar análisis exploratorios de datos e ingeniería de datos, cree un recurso de proceso para ejecutar comandos.
- Haga clic en
Computar en la barra lateral. - En la página Proceso, haga clic en Crear proceso.
- Especifique un nombre único para el recurso de proceso, deje los valores restantes en su estado predeterminado y haga clic en Crear proceso.
Para más información sobre el proceso de Databricks, consulte Proceso.
Paso 2: creación de un cuaderno de Databricks
Para crear un cuaderno en el área de trabajo, haga clic en
Nuevo en la barra lateral y, después, haga clic en Cuaderno. Se abre un cuaderno en blanco en el área de trabajo.
Para obtener más información sobre cómo crear y administrar cuadernos, consulte Administración de cuadernos.
Paso 3: configuración de Auto Loader para ingerir datos en Delta Lake
Databricks recomienda usar el Auto Loader para la ingesta de datos incremental. Auto Loader detecta y procesa automáticamente los nuevos archivos a medida que llegan al almacenamiento de objetos en la nube.
Databricks recomienda el almacenamiento de datos con Delta Lake. Delta Lake es una capa de almacenamiento de código abierto que proporciona transacciones ACID y habilita el data lakehouse. Delta Lake es el formato predeterminado para las tablas que se crean en Databricks.
Para configurar Auto Loader a fin de ingerir datos en Delta Lake, copie y pegue el código siguiente en la celda vacía del cuaderno:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Nota:
Las variables que se definen en este código deben permitirle que la ejecute de forma segura, sin correr el riesgo de entrar en conflicto con los recursos del área de trabajo existentes u otros usuarios. Los permisos de red o almacenamiento restringidos producirán errores cuando ejecuten este código; póngase en contacto con el administrador del área de trabajo para solucionar problemas de estas restricciones.
Para obtener más información sobre Auto Loader, consulte Auto Loader.
Paso 4: procesamiento e interacción con datos
Los cuadernos ejecutan la lógica celda por celda. Para ejecutar la lógica en tu celda:
Para ejecutar la celda que completó en el paso anterior, selecciónela y presione SHIFT+ENTER.
Para consultar la tabla que acaba de crear, copie y pegue el código siguiente en una celda vacía y presione MAYÚS+ENTRAR para ejecutar la celda.
Python
df = spark.read.table(table_name)Scala
val df = spark.read.table(table_name)Para obtener una versión preliminar de los datos de DataFrame, copie y pegue el código siguiente en una celda vacía y, a continuación, presione SHIFT+ENTER para ejecutar la celda.
Python
display(df)Scala
display(df)
Para más información sobre las opciones interactivas para visualizar datos, consulte Visualizaciones en cuadernos de Databricks y editor de SQL.
Paso 5: programación de un trabajo
Puede ejecutar cuadernos de Databricks como scripts de producción al agregarlos como una tarea en un trabajo de Databricks. En este paso, creará un nuevo trabajo que podrá desencadenar manualmente.
Para programar el cuaderno como una tarea:
- Haga clic en Programación en el lado derecho de la barra de encabezado.
- Introduzca un nombre único para el nombre del trabajo.
- Haga clic en Manual.
- En la lista desplegable Proceso , seleccione el recurso de proceso que creó en el paso 1.
- Haga clic en Crear.
- En la ventana que aparece, haga clic en Ejecutar ahora.
- Para ver los resultados de la ejecución del trabajo, haga clic en el icono
junto a la marca de tiempo Última ejecución.
Para obtener más información sobre los trabajos, consulte ¿Qué son los trabajos?.
Integraciones adicionales
Obtenga más información sobre las integraciones y herramientas para la ingeniería de datos con Azure Databricks: