Hinzufügen einer PostgreSQL Datenbank CDC-Quelle zu einem Eventstream

In diesem Artikel erfahren Sie, wie Sie eine PostgreSQL-Datenbank Change Data Capture (CDC)-Quelle zu einem Eventstream hinzufügen können.

Mit dem Quellconnector für die Datenänderungserfassung (CDC) in PostgreSQL für Microsoft Fabric-Ereignisdatenströme können Sie eine Momentaufnahme der aktuellen Daten in einer PostgreSQL-Datenbank erfassen. Derzeit wird PostgreSQL Database Change Data Capture (CDC) von den folgenden Diensten unterstützt, auf die öffentlich zugegriffen werden kann, um die Datenbanken abzurufen:

  • Azure-Datenbank für PostgreSQL
  • Amazon RDS für PostgreSQL
  • Amazon Aurora PostgreSQL
  • Google Cloud SQL für PostgreSQL

Sobald die CdC-Quelle der PostgreSQL-Datenbank dem Eventstream hinzugefügt wurde, erfasst sie Änderungen auf Zeilenebene an den angegebenen Tabellen. Diese Änderungen können dann in Echtzeit verarbeitet und zur weiteren Analyse an verschiedene Ziele gesendet werden.

Hinweis

Mit DeltaFlow (Vorschau) können Sie unformatierte Debezium-CDC-Ereignisse in analysefähige Datenströme umwandeln, die Ihre Quelltabellenstruktur spiegeln. DeltaFlow automatisiert schemaregistrierung, Zieltabellenverwaltung und Schemaentwicklungsbehandlung. Um DeltaFlow zu verwenden, wählen Sie während des Schemabehandlungsschritts analysebereite Ereignisse und automatisch aktualisiertes Schema aus.

Voraussetzungen

Aktivieren von CDC in Ihrer PostgreSQL-Datenbank

In diesem Abschnitt wird Azure Database for PostgreSQL als Beispiel verwendet.

Führen Sie die folgenden Schritte aus, um CDC in Ihrem Azure Database for PostgreSQL Flexible Server zu aktivieren:

  1. Wählen Sie auf ihrer Azure Database for PostgreSQL Seite "Flexibler Server" im Azure-Portal im Navigationsmenü Serverparameter aus.

  2. Auf der Seite Serverparameter:

    • Setzen Sie wal_level auf logisch.
    • Aktualisieren Sie max_worker_processes auf mindestens 16.

    Screenshot der Aktivierung von CDC für die Bereitstellung eines flexiblen Servers.

  3. Speichern Sie die Änderungen, und starten Sie den Server neu.

  4. Vergewissern Sie sich, dass Ihre Azure Database for PostgreSQL flexible Serverinstanz öffentlichen Netzwerkdatenverkehr zulässt.

  5. Erteilen Sie dem Administratorbenutzer Replikationsberechtigungen durch Ausführen der folgenden SQL-Anweisung. Wenn Sie ein anderes Benutzerkonto verwenden möchten, um Ihre PostgreSQL-Datenbank (DB) zum Abrufen von CDC herzustellen, stellen Sie sicher, dass der Benutzer der Tabellenbesitzer ist.

    ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
    

Assistent zum Auswählen einer Datenquelle starten

Wenn Sie Ihrem Eventstream noch keine Quelle hinzugefügt haben, wählen Sie die Kachel "Datenquellen verbinden" aus . Sie können auch Quelle hinzufügen>Datenquellen verbinden im Menüband auswählen.

Screenshot, der die Auswahl der Kachel für die Verwendung einer externen Quelle zeigt.

Wenn Sie die Quelle zu einem bereits veröffentlichten Ereignisstream hinzufügen, wechseln Sie zum Bearbeitungsmodus . Wählen Sie im Menüband "Quelle hinzufügen; Datenquellen verbinden" aus.

Screenshot der Auswahl zum Hinzufügen externer Quellen.

Suchen Sie auf der Seite "Datenquelle auswählen" auf der Kachel "PostgreSQL DB (CDC)" nach "Verbinden", und wählen Sie "Verbinden" aus.

Screenshot, das die Auswahl von Azure Database (DB) für PostgreSQL (CDC) als Quelltyp im Assistenten zum Abrufen von Ereignissen zeigt.

Konfigurieren und Herstellen einer Verbindung mit dem CdC der PostgreSQL-Datenbank

Erfassen von Änderungsdaten aus PostgreSQL-Datenbanken mit automatischer Tabellenschemaregistrierung über CDC in Eventstream.

Hinweis

DeltaFlow (Vorschau):Wenn Sie Analysefähige Ereignisse und automatisch aktualisiertes Schema im Schemabehandlungsschritt auswählen, transformiert DeltaFlow rohe Debezium CDC-Ereignisse in analysefähige Datenströme, die Ihre Quelltabellenstruktur spiegeln. DeltaFlow automatisiert auch die Erstellung von Zieltabellen und die Behandlung der Schemaentwicklung.

  1. Wählen Sie auf der Seite Verbinden die Option Neue Verbindung aus.

    Screenshot der Seite

  2. Geben Sie im Abschnitt Verbindungseinstellungen die folgende Informationen ein.

    • Server: Die Serveradresse Ihrer PostgreSQL-Datenbank, z. B. my-pgsql-server.postgres.database.azure.com.

    • Datenbank: Der Datenbankname, z. B. my_database.

      Screenshot des Abschnitts

    • Verbindungsname: Geben Sie einen Namen für die Verbindung ein.

    • Authentifizierungsart, Wählen Sie "Einfach " aus, und geben Sie Ihren Benutzernamen und Ihr Kennwort für die Datenbank ein.

      Hinweis

      Derzeit unterstützen Fabric Ereignisdatenströme nur Basicauthentifizierung.

    • Wählen Sie "Verbinden" aus, um die Verbindungseinstellungen abzuschließen. Screenshot des Abschnitts

  3. Port: Geben Sie die Portnummer Ihres Servers ein. Der Standardwert ist 5432. Wenn Ihre ausgewählte Cloudverbindung in "Verbindungen und Gateways verwalten" konfiguriert ist, stellen Sie sicher, dass die Portnummer dem dort festgelegten Entspricht. Wenn sie nicht übereinstimmen, haben die Portnummer in der Cloudverbindung unter "Verbindungen und Gateways verwalten " Vorrang.

  4. Sie können zwischen zwei Optionen beim Erfassen von Änderungen aus Datenbanktabellen wählen:

    • Alle Tabellen: Erfassen Sie Änderungen aus jeder Tabelle in der Datenbank.
    • Geben Sie Tabellennamen ein: Ermöglicht ihnen, eine Teilmenge von Tabellen mithilfe einer durch Trennzeichen getrennten Liste anzugeben. Sie können entweder: vollständige Tabellenbezeichner im Format schemaName.tableName oder gültige reguläre Ausdrücke verwenden. Beispiele
    • dbo.test.*: Wählen Sie alle Tabellen aus, deren Namen mit test im dbo Schema beginnen.
    • dbo\.(test1|test2): Auswählen dbo.test1 und dbo.test2.

    Sie können beide Formate in der Liste kombinieren. Die Gesamtzeichenbeschränkung für den gesamten Eintrag beträgt 102.400 Zeichen.

  5. Slotname (optional): Geben Sie den Namen des logischen Dekodierungsslots in PostgreSQL ein, der für die Streaming-Änderungen eines bestimmten Plug-ins für eine bestimmte Datenbank oder ein bestimmtes Schema erstellt wurde. Der Server verwendet diesen Steckplatz, um Ereignisse zum Eventstream-Streamingconnector zu streamen. Sie darf nur Kleinbuchstaben, Zahlen und Unterstriche enthalten.

    • Wenn nicht angegeben, wird eine GUID verwendet, um den Steckplatz zu erstellen, der die entsprechenden Datenbankberechtigungen erfordert.
    • Wenn ein angegebener Steckplatzname vorhanden ist, verwendet der Verbinder ihn direkt.
  6. Erweitern Sie erweiterte Einstellungen , um auf weitere Konfigurationsoptionen für die CdC-Quelle der PostgreSQL-Datenbank zuzugreifen:

    • Publikationsname: Gibt den Namen der zu verwendenden logischen Replikationsveröffentlichung von PostgreSQL an. Dieser Wert muss mit einer vorhandenen Publikation in der Datenbank übereinstimmen, oder er wird abhängig vom automatischen Erstellungsmodus automatisch erstellt. Standardwert: dbz_publication.

      Hinweis

      Der Connectorbenutzer muss über Superuserberechtigungen verfügen, um die Publikation zu erstellen. Es wird empfohlen, die Publikation manuell zu erstellen, bevor Sie den Connector zum ersten Mal starten, um Berechtigungsbezogene Probleme zu vermeiden.

    • Automatischer Publikations-Erstellungsmodus: Steuert, ob und wie die Publikation automatisch erstellt wird. Zu den Optionen gehören:

      • Filtered (Standard): Wenn die angegebene Publikation nicht vorhanden ist, erstellt der Connector eine, die nur die ausgewählten Tabellen enthält, wie in der Tabellen-Auswahlliste angegeben.
      • AllTables: Wenn die angegebene Publikation vorhanden ist, verwendet der Connector sie. Wenn er nicht vorhanden ist, erstellt der Connector einen, der alle Tabellen in der Datenbank enthält.
      • Disabled: Der Connector erstellt keine Publikation. Wenn die angegebene Publikation fehlt, löst der Connector eine Ausnahme aus und stoppt. In diesem Fall muss die Publikation manuell in der Datenbank erstellt werden.

      Weitere Informationen finden Sie in der Debezium-Dokumentation zum automatischen Erstellen im Publikationsmodus.

    • Dezimalbehandlungsmodus: Gibt an, wie der Verbinder PostgreSQL DECIMAL und NUMERIC Spaltenwerte verarbeitet:

      • Precise: Stellt Werte mit exakten Dezimaltypen dar (z. B. Java BigDecimal), um die volle Genauigkeit und Genauigkeit in der Datendarstellung sicherzustellen.
      • Double: Wandelt Werte in Gleitkommazahlen mit doppelter Genauigkeit um. Diese Option verbessert die Benutzerfreundlichkeit und Leistung, kann aber zu einem Genauigkeitsverlust führen.
      • String: Codiert Werte als formatierte Zeichenfolgen. Diese Option erleichtert die Nutzung in nachgelagerten Systemen, verliert jedoch semantische Informationen über den ursprünglichen numerischen Typ.
    • Momentaufnahmemodus: Geben Sie die Kriterien für das Ausführen einer Momentaufnahme an, wenn der Connector gestartet wird:

      • Initial: Der Connector führt einen Schnappschuss nur dann aus, wenn keine Offsets für den logischen Servernamen aufgezeichnet wurden oder wenn er erkennt, dass ein früherer Schnappschuss nicht abgeschlossen wurde. Nach Abschluss der Momentaufnahme beginnt der Connector, Ereignisdatensätze für nachfolgende Datenbankänderungen zu streamen.
      • InitialOnly: Der Connector führt nur eine Momentaufnahme aus, wenn für den logischen Servernamen keine Offsets aufgezeichnet wurden. Nach Abschluss der Momentaufnahme stoppt der Verbinder. Es wechselt nicht zum Streaming, um Änderungsereignisse aus dem Binlog zu lesen.
      • NoData: Der Connector führt eine Momentaufnahme aus, die nur das Schema, aber keine Tabellendaten erfasst. Legen Sie diese Option fest, wenn Sie keine konsistente Momentaufnahme der Daten benötigen, aber sie benötigen nur die Änderungen, die seit dem Start des Connectors vorgenommen werden.
    • Heartbeat-Abfrage: Gibt eine Abfrage an, die der Connector in der Quelldatenbank ausführt, wenn er eine Heartbeat-Nachricht sendet.

    • Außerkraftsetzung der Snapshot select-Anweisung: Gibt die Tabellenzeilen an, die in eine Momentaufnahme eingeschlossen werden sollen. Verwenden Sie die Eigenschaft, wenn eine Momentaufnahme nur eine Teilmenge der Zeilen in einer Tabelle enthalten soll. Diese Eigenschaft wirkt sich nur auf Momentaufnahmen aus. Sie gilt nicht für Ereignisse, die der Connector aus dem Protokoll liest.

Stream- oder Quelldetails

  1. Führen Sie auf der Seite "Verbinden " einen dieser Schritte aus, je nachdem, ob Sie Eventstream oder Real-Time Hub verwenden.

    • Eventstream:

      Führen Sie im Bereich " Quelldetails " rechts die folgenden Schritte aus:

      1. Um den Quellnamen zu ändern, wählen Sie das Stiftsymbol aus.

      2. Beachten Sie, dass der Eventstream-Name und der Stream-Name schreibgeschützt sind.

    • Real-Time Hub:

      Führen Sie im Abschnitt Datenstromdetails die folgenden Schritte aus:

      1. Wählen Sie den arbeitsbereich Fabric aus, in dem Sie den Ereignisstream erstellen möchten.

      2. Wählen Sie für den Eventstream-Namen die Stiftschaltfläche aus, und geben Sie einen Namen für den Eventstream ein.

      3. Der Wert für den Stream-Namen wird automatisch generiert, indem "-stream " an den Namen des Ereignisstreams angefügt wird. Dieser Datenstrom wird auf der Seite "Alle Datenströme" des Echtzeithubs angezeigt, wenn der Assistent beendet wird.

  2. Wählen Sie unten auf der Seite "Konfigurieren" die Option "Weiter" aus.

Überprüfen und Verbinden

Überprüfen Sie auf dem Bildschirm "Überprüfen + Verbinden " die Zusammenfassung, und wählen Sie "Hinzufügen " (Eventstream) oder "Verbinden " (Real-Time Hub) aus.

Schemabehandlungsseite

  1. Wählen Sie im Schemabehandlungsschritt eine der folgenden Optionen aus:

    • Analysefähige Ereignisse und automatisch aktualisiertes Schema (DeltaFlow Preview): Der Connector wandelt unformatierte CDC-Ereignisse in analysefähige Datenströme um, die Ihre Quelltabellenstruktur spiegeln. DeltaFlow erweitert Ereignisse mit Metadaten wie Änderungstyp (Einfügen, Aktualisieren oder Löschen) und Zeitstempeln und verwaltet automatisch Zieltabellen und Schemaentwicklung.
    • Roh-CDC-Ereignisse: Der Connector importiert und stellt die Roh-CDC-Ereignisse zur Verfügung. Optional kann der Connector Tabellenschemas automatisch bestimmen und in der Schemaregistrierung registrieren. Verwenden Sie diese Option, wenn Sie die Schemasensibilisierung ohne DeltaFlow-Transformation wünschen.

    Hinweis

    Der folgende Screenshot zeigt Azure SQL Database CDC. Die Schemabehandlungsoptionen sind für alle unterstützten CDC-Quellconnectors identisch.

    Screenshot des Schemabehandlungsschritts mit DeltaFlow- und Raw CDC-Ereignisoptionen für einen CDC-Quellconnector.

  2. Aktivieren Sie die Ereignisschemazuordnung.

  3. Wählen Sie für Workspace einen Fabric Arbeitsbereich für den Schemasatz aus.

  4. Bei Schemasatz ist +Erstellen standardmäßig ausgewählt, wodurch ein neuer Schemasatz erstellt wird. Sie können ihn ändern, um einen vorhandenen Ereignisschemasatz auszuwählen.

  5. Wenn Sie im vorherigen Schritt die Option +Erstellen ausgewählt haben, geben Sie einen Namen für den Schemasatz ein.

  6. Überprüfen Sie auf der Seite "Überprüfen + Verbinden " die Zusammenfassung, und wählen Sie dann "Hinzufügen " (Eventstream) oder "Verbinden " (Real-Time Hub) aus.

    Screenshot der Seite

    Für alle Tabellen oder ausgewählte Tabellen in der PostgreSQL-Datenbank erkennt der Connector automatisch und erstellt Schemas, die mit dem Schemaregister registriert werden.

DeltaFlow: Analysebereite Ereignistransformation (Vorschau)

Wenn Sie Analysefähige Ereignisse und automatisch aktualisiertes Schema (DeltaFlow) aktivieren, bietet der Connector die folgenden Funktionen:

  • Für Analysen geeignete Ereignisstruktur: Rohe Debezium CDC-Ereignisse werden in ein tabellarisches Format umgewandelt, das die Struktur der Quelltabelle widerspiegelt. Ereignisse werden mit Metadatenspalten erweitert, einschließlich des Änderungstyps (insert, updateoder delete) und des Ereigniszeitstempels.
  • Automatische Verwaltung der Zieltabelle: Wenn Sie DeltaFlow-fähige Datenströme an ein unterstütztes Ziel wie ein Eventhouse weiterleiten, werden Zieltabellen automatisch erstellt, um dem Quelltabellenschema zu entsprechen. Sie müssen keine Zieltabellen manuell erstellen oder konfigurieren.
  • Schemaentwicklungsbehandlung: Wenn sich Quelldatenbanktabellen ändern (z. B. werden neue Spalten hinzugefügt oder Tabellen erstellt), erkennt DeltaFlow automatisch die Änderungen, aktualisiert die registrierten Schemas und passt die Zieltabellen entsprechend an. Dieses Verhalten minimiert den manuellen Eingriff durch Schemaänderungen.

Hinweis

DeltaFlow (Vorschau) wird derzeit mit den Azure SQL Database CDC, Azure SQL Managed Instance CDC, SQL Server auf virtueller Maschine CDC und PostgreSQL CDC-Quell-Connectors unterstützt.

Ausführliche Informationen dazu, wie DeltaFlow rohe CDC-Ereignisse in analysebereite Ausgaben transformiert, einschließlich Vorgangstypen und Metadatenspalten, finden Sie unter DeltaFlow-Ausgabetransformation.

Anzeigen des aktualisierten Eventstreams

  1. Sie können die PostgreSQL-Datenbank-CDC-Quelle sehen, die Ihrem Eventstream im Bearbeitungsmodus hinzugefügt wurde.

    Screenshot des Streamings der PostgreSQL DB CDC-Quelle in der Bearbeitungsansicht mit erweiterten Features.

  2. Um diese neu hinzugefügte PostgreSQL-Datenbank CDC-Quelle zu implementieren, wählen Sie Veröffentlichen. Nachdem Sie diese Schritte durchgeführt haben, steht Ihre PostgreSQL-Datenbank CDC-Quelle in der Live-Ansicht zur Visualisierung zur Verfügung.

    Screenshot des Streamings der PostgreSQL DB CDC-Quelle in der Liveansicht mit erweiterten Features.

Konfigurieren von Eventstream-Zielen für die Verwendung von Schemas

Derzeit werden nur die Eventhouse-, benutzerdefinierten endpunkt- und abgeleiteten Datenstromziele für Eventstreams mit zugeordneten Schemas unterstützt. In diesem Abschnitt wird gezeigt, wie Sie ein Eventhouse-Ziel hinzufügen und konfigurieren, wenn erweiterte Features (z. B. die Schemaunterstützung) für den Eventstream aktiviert sind.

Hinweis

Wenn Sie DeltaFlow (Vorschau) mit einer unterstützten CDC-Quelle (Change Data Capture) verwenden, werden Zieltabellen im Eventhouse automatisch erstellt und verwaltet, um der Quelltabellenstruktur zu entsprechen. Sie müssen das Zieltabellenschema nicht manuell konfigurieren. DeltaFlow behandelt auch die Schemaentwicklung automatisch, wenn sich Quelltabellen ändern.

Konfigurieren eines Schemas für ein benutzerdefiniertes Endpunktziel

  1. Wählen Sie Transformationsereignisse aus, oder fügen Sie das Ziel hinzu, und wählen Sie dann "CustomEndpoint" aus.

  2. Geben Sie im Bereich "Benutzerdefinierter Endpunkt " einen Namen für das Ziel an.

  3. Wählen Sie für das Eingabeschema das Schema für Ereignisse aus. Sie treffen eine Auswahl in diesem Feld, wenn Sie die Schemaunterstützung für einen Eventstream aktivieren.

Screenshot des Bereichs zum Konfigurieren eines benutzerdefinierten Endpunkts.

Ausführliche Schritte zum Konfigurieren eines benutzerdefinierten Endpunktziels finden Sie unter Hinzufügen eines benutzerdefinierten Endpunkts oder benutzerdefinierten App-Ziels zu einem Eventstream.

Schemas für ein Eventhouse-Ziel konfigurieren

  1. Wählen Sie Transformationsereignisse aus, oder fügen Sie das Ziel hinzu, und wählen Sie dann Eventhouse aus.

  2. Konfigurieren Sie im Eventhouse-Bereich die folgenden schemabezogenen Einstellungen:

    1. Wählen Sie für das Eingabeschema ein oder mehrere Schemas aus der Dropdownliste aus.

      Screenshot des Ereignishauskonfigurationsbereichs mit ausgewähltem Eingabeschema.

      Hinweis

      Wenn Sie das dynamische Schema über die Headeroption beim Konfigurieren einer Event Hubs-Quelle ausgewählt haben, haben Sie möglicherweise mehrere Schemas für die Quelle konfiguriert und sie verschiedenen Eigenschaften und deren Werten zugeordnet.

    2. Wählen Sie für die Tabellenerstellungsmethode entweder eine einzelne Tabelle, in der alle Schemata kombiniert werden, oder separate Tabellen für jedes einzelne Schema aus, je nach Ihren Anforderungen.

      Screenshot des Ereignishaus-Konfigurationsbereichs mit Tabellenerstellungsmethoden.

    3. Wählen Sie für Write data with eine der folgenden Optionen aus:

      • Nur Nutzlast: Schreiben extrahierter Nutzlastdaten in die Tabelle. Wenn mehrere Eingabeschemas vorhanden sind, werden Daten an mehrere Tabellen gesendet.
      • Metadaten und Nutzlast: Schreiben von Metadaten und Nutzlastdaten in eine einzelne Tabelle. Beispielspalten umfassen source , , subject, typeund data.

      Screenshot des Ereignishauskonfigurationsbereichs mit den Optionen zum Schreiben von Daten.

Ausführliche Schritte zum Konfigurieren eines Eventhouse-Ziels finden Sie unter Hinzufügen eines Eventhouse-Ziels zu einem Eventstream.

Anzeigen der ausgabebereiten DeltaFlow-Analyse (Vorschau)

Wenn Sie Analytics-ready-Ereignisse und ein automatisch aktualisiertes Schema (DeltaFlow) aktiviert haben, werden die Zieltabellen automatisch in einer Form erstellt, die die Quelldatenbanktabellen widerspiegelt. Jede Tabelle enthält die ursprünglichen Spalten zusammen mit Metadatenspalten für den Änderungstyp und den Zeitstempel.

Hinweis

Der folgende Screenshot zeigt Azure SQL Database CDC. Die Ausgabe der DeltaFlow-Zieltabelle ist für alle unterstützten CDC-Quell-Connectors identisch.

Screenshot der Eventhouse-Zieltabellen, die von DeltaFlow in analysebereiter Form erstellt wurden.

Sie können diese Tabellen mithilfe von Kusto Query Language (KQL) oder anderen Analysetools abfragen, ohne rohe Debezium CDC-Nutzlasten analysieren zu müssen.

Andere Konnektoren: