Adición de un origen de CDC de base de datos de PostgreSQL a una secuencia de eventos

En este artículo se muestra cómo agregar un origen de captura de datos modificados (CDC) de base de datos de PostgreSQL a una secuencia de eventos.

El conector de origen de Captura de Datos Modificados (CDC) de la base de datos PostgreSQL para los flujos de eventos de Microsoft Fabric permite capturar una instantánea de los datos actuales en una base de datos PostgreSQL. Actualmente, la captura de datos modificados de base de datos de PostgreSQL (CDC) se admite desde los siguientes servicios a los que se puede acceder públicamente a las bases de datos:

  • Base de datos de Azure para PostgreSQL
  • Amazon RDS for PostgreSQL
  • Amazon Aurora PostgreSQL
  • Google Cloud SQL for PostgreSQL

Una vez que el origen CDC de la base de datos PostgreSQL se agrega al flujo de eventos, captura los cambios de nivel de fila en las tablas especificadas. Estos cambios se pueden procesar en tiempo real y enviarlos a diferentes destinos para su posterior análisis.

Nota:

Con DeltaFlow (versión preliminar), puede transformar eventos CDC sin procesar de Debezium en flujos listos para análisis que reflejen la estructura de la tabla de origen. DeltaFlow automatiza el registro de esquemas, la administración de tablas de destino y el control de la evolución del esquema. Para usar DeltaFlow, elija Eventos listos para análisis y esquema actualizado automáticamente durante el paso de control de esquemas.

Requisitos previos

Habilitación de CDC en la base de datos postgreSQL

En esta sección se usa Azure Database for PostgreSQL como ejemplo.

Para habilitar CDC en el servidor flexible de Azure Database for PostgreSQL, siga estos pasos:

  1. En su página de Azure Database for PostgreSQL Flexible Server en el portal de Azure, seleccione Parámetros del servidor en el menú de navegación.

  2. En la página Parámetros del servidor:

    • Establezca wal_level en logical.
    • Actualice max_worker_processes al menos a 16.

    Captura de pantalla de la habilitación de CDC para una implementación de servidor flexible.

  3. Guarde los cambios y reinicie el servidor.

  4. Confirme que la instancia de servidor flexible de Azure Database for PostgreSQL permite el tráfico de red pública.

  5. Conceda permisos de replicación de usuarios administradores mediante la ejecución de la siguiente instrucción SQL. Si desea usar otra cuenta de usuario para conectar la base de datos postgreSQL (DB) para capturar CDC, asegúrese de que el usuario es el propietario de la tabla.

    ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
    

Inicio del Asistente para seleccionar un origen de datos

Si aún no ha agregado ningún origen a la secuencia de eventos, seleccione el icono Conectar orígenes de datos . También puede seleccionar Agregar origen>Conectar orígenes de datos en la cinta de opciones.

Captura de pantalla que muestra la selección del icono para usar un origen externo.

Si estás agregando el origen a una secuencia de eventos ya publicada, cambia al modo Edición. En la cinta de opciones, seleccione Agregar origen>Conectar orígenes de datos.

Captura de pantalla que muestra las selecciones para agregar orígenes externos.

En la página Seleccionar un origen de datos, busque y seleccione Conectar en el icono de PostgreSQL DB (CDC).

Captura de pantalla que muestra la selección de Azure Database (DB) for PostgreSQL (CDC) como el tipo de origen en el asistente para obtener eventos.

Configuración y conexión a CDC de PostgreSQL Database

Ingesta de datos modificados de bases de datos de PostgreSQL con registro automático de esquemas de tabla a través de CDC en Eventstream.

Nota:

DeltaFlow (versión preliminar): al seleccionar eventos listos para análisis y esquema actualizado automáticamente en el paso de control de esquemas, DeltaFlow transforma los eventos CDC de Debezium sin procesar en flujos listos para análisis que reflejan la estructura de la tabla de origen. DeltaFlow también automatiza la creación de tablas de destino y el control de la evolución del esquema.

  1. En la página Conectar, seleccione Nueva conexión.

    Captura de pantalla que muestra la página Conectar de una base de datos postgreSQL con el vínculo Nueva conexión resaltado.

  2. Escriba la información siguiente en la sección Configuración de la conexión.

    • Servidor: la dirección del servidor de la base de datos de PostgreSQL, por ejemplo, my-pgsql-server.postgres.database.azure.com.

    • Base de datos: el nombre de la base de datos, por ejemplo, my_database.

      Captura de pantalla que muestra la sección Configuración de conexión del conector de base de datos postgreSQL.

    • Nombre de la conexión: escriba un nombre para esta conexión.

    • Tipo de autenticación, seleccione Básico y escriba el nombre de usuario y la contraseña de la base de datos.

      Nota:

      Actualmente, los flujos de eventos de Fabric solo admiten la autenticación Basic.

    • Seleccione Conectar para completar la configuración de conexión. Captura de pantalla que muestra la sección Credenciales de conexión para el conector de base de datos postgreSQL.

  3. Puerto: escriba el número de puerto del servidor. El valor predeterminado es 5432. Si la conexión en la nube seleccionada está configurada en Administrar conexiones y puertas de enlace, asegúrese de que el número de puerto coincide con el establecido allí. Si no coinciden, el número de puerto de la conexión en la nube en Administrar conexiones y puertas de enlace tiene prioridad.

  4. Puede elegir entre dos opciones al capturar cambios de tablas de base de datos:

    • Todas las tablas: capture los cambios de todas las tablas de la base de datos.
    • Escribir nombres de tabla: permite especificar un subconjunto de tablas mediante una lista separada por comas. Puede usar: identificadores de tabla completos en el formato schemaName.tableName o expresiones regulares válidas. Ejemplos:
    • dbo.test.*: seleccione todas las tablas cuyos nombres empiecen por test en el dbo esquema.
    • dbo\.(test1|test2): seleccione dbo.test1 y dbo.test2.

    Puede combinar ambos formatos en la lista. El límite total de caracteres para toda la entrada es de 102 400 caracteres.

  5. Nombre de ranura (opcional): escriba el nombre de la ranura de descodificación lógica de PostgreSQL que se creó para la transmisión de cambios de un complemento específico para una base de datos o esquema determinado. El servidor usa esta ranura para transmitir eventos al conector de streaming de eventstream. Solo debe contener letras minúsculas, números y caracteres de subrayado.

    • Si no se especifica, se usa un GUID para crear la ranura, lo que requiere los permisos de base de datos adecuados.
    • Si existe un nombre de ranura especificado, el conector lo usa directamente.
  6. Expanda Configuración avanzada para acceder a más opciones de configuración para el origen CDC de base de datos postgreSQL:

    • Nombre de la publicación: especifica el nombre de la publicación de replicación lógica de PostgreSQL que se va a usar. Este valor debe coincidir con una publicación existente en la base de datos o se crea automáticamente en función del modo de creación automática. Valor predeterminado: dbz_publication.

      Nota:

      El usuario del conector debe tener permisos de superusuario para crear la publicación. Se recomienda crear la publicación manualmente antes de iniciar el conector por primera vez para evitar problemas relacionados con los permisos.

    • Modo de creación automática de publicación: controla si la publicación se crea automáticamente y cómo se crea la publicación. Entre las opciones se incluyen:

      • Filtered (valor predeterminado): si la publicación especificada no existe, el conector crea uno que incluye solo las tablas seleccionadas (como se especifica en la lista de inclusión de la tabla).
      • AllTables: si existe la publicación especificada, el conector la usa. Si no existe, el conector crea una que incluye todas las tablas de la base de datos.
      • Disabled: El conector no crea una publicación. Si falta la publicación especificada, el conector produce una excepción y se detiene. En este caso, la publicación debe crearse manualmente en la base de datos.

      Para obtener más información, consulte la documentación de Debezium sobre el modo de creación automática de publicaciones.

    • Modo de control decimal: especifica cómo controla el conector los valores de postgreSQL DECIMAL y NUMERIC columna:

      • Precise: representa valores que usan tipos decimales exactos (por ejemplo, Java BigDecimal) para garantizar una precisión y precisión completas en la representación de datos.
      • Double: convierte valores en números de punto flotante de precisión doble. Esta opción mejora la facilidad de uso y el rendimiento, pero puede dar lugar a una pérdida de precisión.
      • String: codifica los valores como cadenas con formato. Esta opción facilita su consumo en sistemas posteriores, pero pierde información semántica sobre el tipo numérico original.
    • Modo de instantánea: especifique los criterios para realizar una instantánea cuando se inicie el conector:

      • Initial: El conector ejecuta una instantánea solo cuando no se registraron offsets para el nombre del servidor lógico o si detecta que no se pudo completar una instantánea anterior. Una vez completada la instantánea, el conector comienza a transmitir registros de eventos para los cambios posteriores de la base de datos.
      • InitialOnly: El conector ejecuta una instantánea solo cuando no se registraron offsets para el nombre del servidor lógico. Una vez completada la instantánea, el conector se detiene. No realiza la transición al streaming para leer los eventos de cambio del binlog.
      • NoData: el conector ejecuta una instantánea que captura solo el esquema, pero no ningún dato de tabla. Establezca esta opción si no necesita una instantánea coherente de los datos, pero solo necesita los cambios que se producen desde que se inicia el conector.
    • Consulta de acción de latido: especifica una consulta que el conector ejecuta en la base de datos de origen cuando el conector envía un mensaje de latido.

    • Anulación de la instrucción select para instantáneas: Especifica qué filas de la tabla se incluirán en una instantánea. Use la propiedad si desea que una instantánea incluya solo un subconjunto de las filas de una tabla. Esta propiedad solo afecta a las instantáneas. No se aplica a los eventos que el conector lee del registro.

Detalles de flujo o origen

  1. En la página Conectar , siga uno de estos pasos en función de si usa Eventstream o Real-Time hub.

    • Eventstream:

      En el panel Detalles del origen de la derecha, siga estos pasos:

      1. En Nombre de origen, seleccione el botón Lápiz para cambiar el nombre.

      2. Observe que el nombre de Eventstream y el nombre de stream son de solo lectura.

    • Centro de Tiempo Real:

      En la sección Detalles de flujo a la derecha, siga estos pasos:

      1. Seleccione el área de trabajo Fabric donde desea crear la secuencia de eventos.

      2. En Nombre del flujo de eventos, seleccione el botón Editar y escriba un nombre para la secuencia de eventos.

      3. El valor de nombre del flujo se genera automáticamente adjuntando -stream al nombre del flujo de eventos. Esta secuencia aparece en la página Todos los flujos de datos del centro en tiempo real cuando finaliza el asistente.

  2. Seleccione Siguiente en la parte inferior de la página Configurar .

Revisión y conexión

En la pantalla Revisar y conectar, revise el resumen y seleccione Agregar (Eventstream) o Conectar (Real-Time hub).

Página de control de esquemas

  1. En el paso Control de esquemas , elija una de las siguientes opciones:

    • Eventos listos para análisis y esquema actualizado automáticamente (vista previa de DeltaFlow): el conector transforma los eventos CDC sin procesar en flujos listos para análisis que reflejan la estructura de la tabla de origen. DeltaFlow enriquece los eventos con metadatos como el tipo de cambio (inserción, actualización o eliminación) y las marcas de tiempo, y administra automáticamente las tablas de destino y la evolución del esquema.
    • Eventos sin procesar de CDC: el conector ingiere y disponibiliza los eventos sin procesar de CDC. Opcionalmente, el conector puede detectar automáticamente esquemas de tabla y registrarlos en el registro de esquemas. Use esta opción cuando desee conocer el esquema sin la transformación DeltaFlow.

    Nota:

    En la captura de pantalla siguiente se muestra Azure SQL Database CDC. Las opciones de control de esquemas son las mismas para todos los conectores de origen CDC admitidos.

    Captura de pantalla que muestra la etapa de gestión de esquemas con las opciones de eventos DeltaFlow y CDC Raw para un conector de fuente CDC.

  2. Habilite la asociación del esquema de eventos.

  3. Para Workspace, seleccione un área de trabajo de Fabric para el conjunto de esquemas.

  4. En Conjunto de esquemas, + Crear está seleccionado de forma predeterminada, lo que crea un nuevo conjunto de esquemas. Puede cambiarlo para seleccionar un conjunto de esquemas de eventos existente.

  5. Si seleccionó la opción + Crear en el paso anterior, escriba un nombre para el conjunto de esquemas.

  6. En la página Revisar y conectar, revise el resumen y, a continuación, seleccione Agregar (Eventstream) o Conectar (Centro en tiempo real).

    Captura de pantalla que muestra la página Revisar y crear para el conector de base de datos postgreSQL con características extendidas.

    Para todas las tablas o tablas seleccionadas de la base de datos postgreSQL, el conector detecta automáticamente y crea esquemas y los registra con el registro de esquema.

DeltaFlow: transformación de eventos listos para análisis (versión preliminar)

Al habilitar eventos listos para analytics y esquema actualizado automáticamente (DeltaFlow), el conector proporciona las siguientes funcionalidades:

  • Forma de evento lista para análisis: los eventos CDC de Debezium sin procesar se transforman en un formato tabular que refleja la estructura de la tabla de origen. Los eventos se enriquecen con columnas de metadatos, incluido el tipo de cambio (insert, updateo delete) y la marca de tiempo del evento.
  • Administración automática de tablas de destino: al enrutar secuencias habilitadas para DeltaFlow a un destino admitido como un centro de eventos, las tablas de destino se crean automáticamente para que coincidan con el esquema de la tabla de origen. No es necesario crear ni configurar manualmente tablas de destino.
  • Control de evolución del esquema: cuando cambian las tablas de base de datos de origen (por ejemplo, se agregan nuevas columnas o se crean tablas), DeltaFlow detecta automáticamente los cambios, actualiza los esquemas registrados y ajusta las tablas de destino en consecuencia. Este comportamiento minimiza la intervención manual causada por cambios de esquema.

Nota:

DeltaFlow (versión preliminar) es compatible actualmente con conectores de origen CDC de Azure SQL Database, Azure SQL Managed Instance, SQL Server en máquina virtual, y PostgreSQL.

Para más información sobre cómo DeltaFlow transforma los eventos CDC sin procesar en una salida lista para el análisis, incluidos los tipos de operación y las columnas de metadatos, consulte Transformación de salida de DeltaFlow.

Visualización de la secuencia de eventos actualizado

  1. Puede ver el origen de CDC de PostgreSQL Database agregado a la secuencia de eventos en el modo Edición.

    Captura de pantalla del origen CDC de PostgreSQL DB en streaming en la vista Edición con características extendidas.

  2. Para implementar este origen de CDC de base de datos de PostgreSQL recién agregado, seleccione Publicar. Después de completar estos pasos, la fuente de PostgreSQL DB para CDC está disponible para su visualización en la vista en directo.

    Captura de pantalla de la fuente CDC de DB PostgreSQL en streaming, en la vista en vivo, con funcionalidades extendidas.

Configuración de destinos de Eventstream para usar esquemas

Actualmente, solo se admiten los destinos de eventhouse, punto de conexión personalizado y flujo derivado para Eventstreams con esquemas asociados. En esta sección se muestra cómo agregar y configurar un destino de Eventhouse cuando las características extendidas (como la compatibilidad con esquemas) están habilitadas para la secuencia de eventos.

Nota:

Cuando se usa DeltaFlow (versión preliminar) con un origen de captura de datos modificados (CDC) compatible, las tablas de destino de Eventhouse se crean y administran automáticamente para que coincidan con la estructura de la tabla de origen. No es necesario configurar manualmente el esquema de la tabla de destino. DeltaFlow también controla la evolución del esquema automáticamente cuando cambian las tablas de origen.

Configuración de un esquema para un destino de punto de conexión personalizado

  1. Seleccione Transformar eventos o agregar destino y, a continuación, seleccione CustomEndpoint.

  2. En el panel Punto de conexión personalizado , especifique un nombre para el destino.

  3. En Esquema de entrada, seleccione el esquema para los eventos. Al habilitar la compatibilidad de esquemas para una secuencia de eventos, se realiza una selección en este cuadro.

Captura de pantalla que muestra el panel para configurar un punto de conexión personalizado.

Para obtener pasos detallados sobre cómo configurar un destino de punto de conexión personalizado, consulte Incorporación de un punto de conexión personalizado o un destino de aplicación personalizado a una secuencia de eventos.

Configuración de esquemas para un destino de eventhouse

  1. Seleccione Transformar eventos o agregar destino y, a continuación, seleccione Eventhouse.

  2. En el panel Eventhouse , configure las siguientes opciones relacionadas con el esquema:

    1. En Esquema de entrada, seleccione uno o varios esquemas en la lista desplegable.

      Captura de pantalla que muestra el panel de configuración del centro de eventos con un esquema de entrada seleccionado.

      Nota:

      Si seleccionó la opción Esquema dinámico a través de encabezados al configurar un origen de Event Hubs, es posible que haya configurado varios esquemas para el origen y los haya asignado a varias propiedades y sus valores.

    2. En Método de creación de tablas, seleccione Una sola tabla con todos los esquemas combinados o Tablas independientes para cada esquema, en función de sus requisitos.

      Captura de pantalla que muestra el panel de configuración del centro de eventos con métodos de creación de tablas.

    3. En Escribir datos con, seleccione una de las siguientes opciones:

      • Solo datos de carga: Guarde los datos extraídos del payload en la tabla. Si hay varios esquemas de entrada, los datos se envían a varias tablas.
      • Metadatos y carga: incluya metadatos y datos de carga en una tabla única. Las columnas de ejemplo incluyen source , subject, typey data.

      Captura de pantalla que muestra el panel de configuración del centro de eventos con las opciones para escribir datos.

Para obtener pasos detallados sobre cómo configurar un destino de un centro de eventos, consulte Incorporación de un destino de un centro de eventos a una secuencia de eventos.

Visualización de la salida lista para análisis de DeltaFlow (versión preliminar)

Si ha habilitado eventos listos para análisis y esquema actualizado automáticamente (DeltaFlow), las tablas de destino se crean automáticamente en una forma que refleja las tablas de base de datos de origen. Cada tabla incluye las columnas originales junto con las columnas de metadatos para el tipo de cambio y la marca de tiempo.

Nota:

En la captura de pantalla siguiente se muestra Azure SQL Database CDC. La salida de la tabla de destino deltaFlow es la misma para todos los conectores de origen CDC admitidos.

Captura de pantalla que muestra las tablas de destino de Eventhouse creadas por DeltaFlow en forma lista para análisis.

Puede consultar estas tablas mediante el lenguaje de consulta Kusto (KQL) u otras herramientas de análisis, sin tener que analizar las cargas sin procesar de CDC Debezium.

Otros conectores: