Ingesta de datos de PostgreSQL

Importante

El conector de PostgreSQL para Lakeflow Connect está en versión preliminar pública. Póngase en contacto con el equipo de su cuenta de Databricks para inscribirse en la Vista previa pública.

En esta página se describe cómo ingerir datos de PostgreSQL y cargarlos en Azure Databricks mediante Lakeflow Connect. El conector postgreSQL admite AWS RDS PostgreSQL, Aurora PostgreSQL, Amazon EC2, Azure Database for PostgreSQL, máquinas virtuales de Azure, GCP Cloud SQL for PostgreSQL y bases de datos postgreSQL locales mediante Azure ExpressRoute, AWS Direct Connect o redes VPN.

Antes de empezar

  • Para crear una puerta de enlace de ingesta y una canalización de ingesta, debe cumplir los siguientes requisitos:

    • El área de trabajo está habilitada para Unity Catalog.

    • El proceso sin servidor está habilitado para el área de trabajo. Consulte Requisitos de proceso sin servidor.

    • Si tiene previsto crear una conexión: tiene CREATE CONNECTION privilegios en el metastore. Consulte Administración de privilegios en Unity Catalog.

      Si el conector admite la creación de canalizaciones basadas en la interfaz de usuario, puede crear la conexión y la canalización al mismo tiempo completando los pasos de esta página. Sin embargo, si usa la creación de canalizaciones basadas en API, debe crear la conexión en el Explorador de catálogos antes de completar los pasos de esta página. Consulte Conexión a orígenes de ingesta administrados.

    • Si planea utilizar una conexión existente: Tiene privilegios USE CONNECTION o ALL PRIVILEGES en la conexión.

    • Tiene USE CATALOG privilegios en el catálogo de destino.

    • Tiene privilegios USE SCHEMA, CREATE TABLE y CREATE VOLUME en un esquema existente o privilegios CREATE SCHEMA en el catálogo de destino.

    • Tiene acceso a una instancia principal de PostgreSQL. La replicación lógica solo se admite en instancias principales y no en réplicas de lectura.
    • Permisos sin restricciones para crear clústeres o una directiva personalizada (solo API). Una directiva personalizada para la puerta de enlace debe cumplir los siguientes requisitos:
      • Familia: Proceso de trabajos

      • Invalidaciones de familia de directivas:

        {
          "cluster_type": {
            "type": "fixed",
            "value": "dlt"
          },
          "num_workers": {
            "type": "unlimited",
            "defaultValue": 1,
            "isOptional": true
          },
          "runtime_engine": {
            "type": "fixed",
            "value": "STANDARD",
            "hidden": true
          }
        }
        
      • Databricks recomienda especificar los nodos de trabajo más pequeños posibles para las puertas de enlace de ingesta porque no afectan al rendimiento de la puerta de enlace. La siguiente política de cómputo permite a Azure Databricks escalar la puerta de enlace de ingestión para cumplir con los requisitos de su carga de trabajo. El requisito mínimo es de 8 núcleos para habilitar la extracción de datos eficaz y eficaz de la base de datos de origen.

        {
          "driver_node_type_id": {
            "type": "fixed",
            "value": "Standard_E64d_v4"
          },
          "node_type_id": {
            "type": "fixed",
            "value": "Standard_F4s"
          }
        }
        

        Para obtener más información sobre las directivas de clúster, consulte Seleccionar una directiva de procesamiento.

  • Para ingerir desde PostgreSQL, también debe completar la configuración de origen.

Creación de una puerta de enlace y una canalización de ingesta

Interfaz de usuario de Azure Databricks
  1. En la barra lateral del área de trabajo de Azure Databricks, haga clic en Ingesta de datos.

  2. En la página Agregar datos , en Conectores de Databricks, haga clic en PostgreSQL.

  3. En la página Conexión del asistente de ingesta, seleccione la conexión que almacena las credenciales de acceso de Configurar PostgreSQL para la ingesta en Azure Databricks. Si tiene el CREATE CONNECTION privilegio en la metastore, puede hacer clic en el icono Más. Cree una conexión para crear una nueva conexión con los detalles de autenticación en Configuración de PostgreSQL para la ingesta en Azure Databricks.

  4. Haga clic en Siguiente.

  5. En la página Configuración de Ingesta, escriba un nombre único para la canalización de ingesta. Esta tubería mueve los datos del lugar de almacenaje provisional al destino.

  6. Seleccione un catálogo y un esquema en el que escribir registros de eventos. El registro de eventos contiene registros de auditoría, comprobaciones de calidad de datos, progreso de la canalización y errores. Si tiene USE CATALOG y CREATE SCHEMA privilegios en el catálogo, puede hacer clic en el icono de suma. Crear esquema en el menú desplegable para crear un nuevo esquema.

  7. (Opcional) Establezca Actualización completa automática para todas las tablas en Activado. Cuando la actualización automática está activada, la canalización intenta corregir automáticamente problemas como eventos de limpieza de registros y ciertos tipos de evolución del esquema actualizando completamente la tabla afectada. Si el seguimiento del historial está habilitado, una actualización completa borra ese historial.

  8. Escriba un nombre único para la puerta de enlace de ingesta. La puerta de enlace es una canalización que extrae los cambios del origen y los almacena provisionalmente para que se cargue la canalización de ingesta.

  9. Seleccione un catálogo y un esquema para la ubicación de almacenamiento provisional. En esta ubicación se crea un volumen para almacenar provisionalmente los datos extraídos. Si tiene USE CATALOG y CREATE SCHEMA privilegios en el catálogo, puede hacer clic en el icono de suma. Crear esquema en el menú desplegable para crear un nuevo esquema.

  10. Haga clic en Crear canalización y continúe.

  11. En la página Origen , seleccione las tablas que se van a ingerir. Si selecciona tablas específicas, puede configurar las opciones de tabla:

    a) (Opcional) En la pestaña Configuración , especifique un nombre de destino para cada tabla ingerida. Esto resulta útil para diferenciar entre las tablas de destino al ingerir un objeto en el mismo esquema varias veces. Consulte Nombre de una tabla de destino.

    a) (Opcional) Cambie la configuración de seguimiento del historial predeterminada. Consulte Habilitación del seguimiento del historial (tipo 2 de SCD).

  12. Haga clic en Siguiente y, a continuación, haga clic en Guardar y continuar.

  13. En la página Destino , seleccione un catálogo y un esquema en el que cargar datos. Si tiene USE CATALOG y CREATE SCHEMA privilegios en el catálogo, puede hacer clic en el icono de suma. Crear esquema en el menú desplegable para crear un nuevo esquema.

  14. Haga clic en Guardar y continuar.

  15. En la página Configuración de la base de datos , escriba el nombre del espacio de replicación y el nombre de la publicación de cada base de datos desde la que desea ingerir.

  16. (Opcional) En la página Programaciones y notificaciones , haga clic en el icono Más. Crear programación. Establezca la frecuencia para actualizar las tablas de destino.

  17. (Opcional) Haga clic en el icono Más. Agregue una notificación para establecer notificaciones por correo electrónico para que la operación de canalización se complete correctamente o no y, a continuación, haga clic en Guardar y ejecutar canalización.

Antes de ingerir mediante paquetes de Automatización declarativa, API de Databricks, SDK de Databricks, la CLI de Databricks o Terraform, debe tener acceso a una conexión de Catálogo de Unity existente. Para obtener instrucciones, consulte Conexión a orígenes de ingesta administrados.

Crear el catálogo de staging y el esquema

El catálogo y el esquema de almacenamiento provisional pueden ser los mismos que el catálogo y el esquema de destino. El catálogo de ensayo no puede ser un catálogo externo.

Interfaz de línea de comandos (CLI)

export CONNECTION_NAME="my_postgresql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_postgresql_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="postgresql-instance.example.com"
export DB_PORT="5432"
export DB_DATABASE="your_database"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"

output=$(databricks connections create --json '{
  "name": "'"$CONNECTION_NAME"'",
  "connection_type": "POSTGRESQL",
  "options": {
    "host": "'"$DB_HOST"'",
    "port": "'"$DB_PORT"'",
    "database": "'"$DB_DATABASE"'",
    "user": "'"$DB_USER"'",
    "password": "'"$DB_PASSWORD"'"
  }
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

La puerta de enlace de ingesta extrae la instantánea y cambia los datos de la base de datos de origen y los almacena en el volumen de almacenamiento provisional de Unity Catalog. Debe usar la puerta de enlace como una canalización continua. Es fundamental para PostgreSQL evitar el sobredimensionamiento del Write-Ahead Log (WAL) y asegurarse de que las ranuras de replicación no acumulen cambios sin consumir.

La canalización de ingesta aplica la instantánea y cambia los datos del volumen de almacenamiento provisional en tablas de streaming de destino.

Agrupaciones de automatización declarativa

Puede implementar una canalización de ingestión usando Declarative Automation Bundles. Las agrupaciones pueden contener definiciones de YAML de trabajos y tareas, se administran mediante la CLI de Databricks y se pueden compartir y ejecutar en diferentes áreas de trabajo de destino (como desarrollo, almacenamiento provisional y producción). Para obtener más información, consulte Agrupaciones de automatización declarativa.

  1. Cree una agrupación mediante la CLI de Databricks:

    databricks bundle init
    
  2. Agregue dos nuevos archivos de recursos al lote:

    • Un archivo de definición de canalización (por ejemplo, resources/postgresql_pipeline.yml).
    • Un archivo de definición de trabajo que controla la frecuencia de ingesta de datos (por ejemplo, resources/postgresql_job.yml).

    A continuación se muestra un archivo resources/postgresql_pipeline.yml de ejemplo:

    variables:
      # Common variables used multiple places in the DAB definition.
      gateway_name:
        default: postgresql-gateway
      dest_catalog:
        default: main
      dest_schema:
        default: ingest-destination-schema
    
    resources:
      pipelines:
        gateway:
          name: ${var.gateway_name}
          gateway_definition:
            connection_name: <postgresql-connection>
            gateway_storage_catalog: main
            gateway_storage_schema: ${var.dest_schema}
            gateway_storage_name: ${var.gateway_name}
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
    
        pipeline_postgresql:
          name: postgresql-ingestion-pipeline
          ingestion_definition:
            ingestion_gateway_id: ${resources.pipelines.gateway.id}
            source_type: POSTGRESQL
            objects:
              # Modify this with your tables!
              - table:
                  # Ingest the table public.orders to dest_catalog.dest_schema.orders.
                  source_catalog: your_database
                  source_schema: public
                  source_table: orders
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - schema:
                  # Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
                  # table name will be the same as it is on the source.
                  source_catalog: your_database
                  source_schema: public
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
            source_configurations:
              - catalog:
                  source_catalog: your_database
                  postgres:
                    slot_config:
                      slot_name: databricks_slot
                      publication_name: databricks_publication
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
    

    A continuación se muestra un archivo resources/postgresql_job.yml de ejemplo:

    resources:
      jobs:
        postgresql_dab_job:
          name: postgresql_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_postgresql.id}
    
  3. Implemente la canalización mediante la CLI de Databricks:

    databricks bundle deploy
    
Cuaderno de Azure Databricks

Actualice la celda Configuration del cuaderno siguiente con la conexión de origen, el catálogo de destino, el esquema de destino y las tablas que se van a importar desde el origen.

Creación de la puerta de enlace y la canalización de ingesta

Obtener el cuaderno

Databricks CLI

Para crear la puerta de enlace:

gateway_json=$(cat <<EOF
{
  "name": "$GATEWAY_PIPELINE_NAME",
  "catalog": "$STAGING_CATALOG",
  "schema": "$STAGING_SCHEMA",
  "gateway_definition": {
    "connection_name": "$CONNECTION_NAME",
    "gateway_storage_catalog": "$STAGING_CATALOG",
    "gateway_storage_schema": "$STAGING_SCHEMA",
    "gateway_storage_name": "$GATEWAY_PIPELINE_NAME"
  }
}
EOF
)

output=$(databricks pipelines create --json "$gateway_json")
echo $output
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')

Para crear la canalización de ingesta:

pipeline_json=$(cat <<EOF
{
  "name": "$INGESTION_PIPELINE_NAME",
  "catalog": "$TARGET_CATALOG",
  "schema": "$TARGET_SCHEMA",
  "ingestion_definition": {
    "ingestion_gateway_id": "$GATEWAY_PIPELINE_ID",
    "source_type": "POSTGRESQL",
    "objects": [
      {
        "table": {
          "source_catalog": "your_database",
          "source_schema": "public",
          "source_table": "orders",
          "destination_catalog": "$TARGET_CATALOG",
          "destination_schema": "$TARGET_SCHEMA",
          "destination_table": "orders"
        }
      },
      {
        "schema": {
          "source_catalog": "your_database",
          "source_schema": "public",
          "destination_catalog": "$TARGET_CATALOG",
          "destination_schema": "$TARGET_SCHEMA"
        }
      }
    ],
    "source_configurations": [
      {
        "catalog": {
          "source_catalog": "your_database",
          "postgres": {
            "slot_config": {
              "slot_name": "databricks_slot",
              "publication_name": "databricks_publication"
            }
          }
        }
      }
    ]
  }
}
EOF
)

databricks pipelines create --json "$pipeline_json"

Requiere la CLI de Databricks v0.276.0 o posterior.

Terraform

Puede usar Terraform para implementar y administrar canalizaciones de ingesta de PostgreSQL. Para obtener un marco de ejemplo completo, incluidas las configuraciones de Terraform para crear puertas de enlace y canalizaciones de ingesta, consulte el repositorio de ejemplos de Terraform de Lakeflow Connect en GitHub.

Inicio, programación y establecimiento de alertas en la canalización

Para obtener información sobre cómo iniciar, programar y establecer alertas en la canalización, consulte Tareas comunes de mantenimiento de canalización.

Comprobación de la ingesta de datos correcta

La vista en lista en la página de detalles de la canalización muestra el número de registros procesados conforme se incorporan los datos. Estos números se actualizan automáticamente.

Comprobación de la replicación

Las columnas Upserted records y Deleted records no se muestran de forma predeterminada. Puede habilitarlas haciendo clic en el botón de configuración de columnas Icono de configuración de columnas y seleccionándolas.

Recursos adicionales