イベントストリームに PostgreSQL データベース CDC ソースを追加

この記事では,PostgreSQL データベースの変更データキャプチャ (CDC) ソースをイベントストリームに追加する方法を説明します。

Microsoft Fabric イベント ストリーム用の PostgreSQL Database Change Data Capture (CDC) ソース コネクタを使用すると、PostgreSQL データベース内の現在のデータのスナップショットをキャプチャできます。 現在、PostgreSQL データベース変更データ キャプチャ (CDC) は、データベースにパブリックにアクセスできる次のサービスからサポートされています。

  • Azure Database for PostgreSQL
  • Amazon RDS for PostgreSQL
  • Amazon Aurora PostgreSQL
  • Google Cloud SQL for PostgreSQL

PostgreSQL データベース CDC ソースがイベントストリームに追加されると、指定したテーブルに対する行レベルの変更がキャプチャされます。 その後、これらの変更をリアルタイムで処理し、別の宛先に送信して詳細な分析を行うことができます。

DeltaFlow (プレビュー) を使用すると、未加工の Debezium CDC イベントを、ソース テーブル構造をミラーリングする分析対応ストリームに変換できます。 DeltaFlow を使用すると、スキーマの登録、宛先テーブルの管理、スキーマの進化の処理が自動化されます。 DeltaFlow を使用するには、スキーマ処理手順中に [Analytics 対応イベント] と [スキーマの自動更新 ] を選択します。

前提条件

PostgreSQL データベースで CDC を有効にする

このセクションでは、例として Azure Database for PostgreSQL を使用します。

Azure Database for PostgreSQL フレキシブル サーバーで CDC を有効にするには、次の手順に従います。

  1. Azure ポータルの [Azure Database for PostgreSQL フレキシブル サーバー] ページで、ナビゲーション メニューの [Server parameters を選択します。

  2. サーバーパラメータページで

    • wal_levellogicalに設定します。
    • max_worker_processesを少なくとも16 に更新します。

    柔軟なサーバー展開のために CDC を有効にするスクリーンショット。

  3. 変更点を保存し、サーバーを再起動します。

  4. Azure Database for PostgreSQLフレキシブル サーバー インスタンスでパブリック ネットワーク トラフィックが許可されていることを確認します。

  5. 次の SQL ステートメントを実行して、管理者 ユーザーにレプリケーションアクセス許可権限を許可します。 他のユーザー アカウントを使用して PostgreSQL データベース (DB) を接続して CDC をフェッチする場合は、ユーザーが テーブル所有者であることを確認します。

    ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
    

データ ソースの選択ウィザードを起動する

まだ eventstream にソースを追加していない場合は、[ データ ソースの接続 ] タイルを選択します。 リボンの [ ソースの追加>データ ソースの接続 ] を選択することもできます。

外部ソースを使用するためのタイルの選択を示すスクリーンショット。

既に公開されているイベントストリームにソースを追加する場合は、 編集 モードに切り替えます。 リボンで、[ソースの追加] >[データ ソースの接続] を選択します。

外部ソースを追加するための選択を示すスクリーンショット。

[データ ソースの選択] ページで、PostgreSQL DB (CDC) タイルで [接続] を検索して選択します。

Get イベント ウィザードでソースの種類として Azure Database (DB) for PostgreSQL (CDC) を選択したことを示すスクリーンショット。

PostgreSQL Database CDC の構成と接続

CDC を介して Eventstream にテーブル スキーマを自動登録して PostgreSQL データベースから変更データを取り込みます。

DeltaFlow (プレビュー):スキーマ処理手順で Analytics 対応イベントと自動更新されたスキーマ を選択すると、DeltaFlow は未加工の Debezium CDC イベントを、ソース テーブル構造をミラーリングする分析対応ストリームに変換します。 DeltaFlow では、変換先テーブルの作成とスキーマの進化処理も自動化されます。

  1. 接続ページで、新しい接続 を選択します。

    [新しい接続] リンクが強調表示されている PostgreSQL データベースの [接続] ページを示すスクリーンショット。

  2. 接続設定セクションで、次の情報を入力します。

    • サーバー: PostgreSQL データベースのサーバー アドレス (my-pgsql-server.postgres.database.azure.com など)。

    • データベース: データベース名 (my_database など)。

      PostgreSQL データベース コネクタの [接続設定] セクションを示すスクリーンショット。

    • 接続名: 接続の名前を入力します。

    • [認証の種類] で [ 基本 ] を選択し、データベースの ユーザー名パスワード を入力します。

      現在、Fabric イベント ストリームでは、Basic 認証のみがサポートされています。

    • [ 接続 ] を選択して接続設定を完了します。 PostgreSQL データベース コネクタの [接続資格情報] セクションを示すスクリーンショット。

  3. ポート: サーバーのポート番号を入力します。 既定値は 5432 です。 選択したクラウド接続が [接続とゲートウェイの管理] で構成されている場合は、ポート番号がそこに設定されているポート番号と一致していることを確認します。 一致しない場合は、[ 接続とゲートウェイの管理 ] のクラウド接続のポート番号が優先されます。

  4. データベース テーブルから変更をキャプチャする場合は、次の 2 つのオプションから選択できます。

    • すべてのテーブル: データベース内のすべてのテーブルからの変更をキャプチャします。
    • テーブル名の入力: コンマ区切りリストを使用して、テーブルのサブセットを指定できます。 schemaName.tableName形式の完全なテーブル識別子または有効な正規表現のいずれかを使用できます。 例:
    • dbo.test.*: test スキーマで名前がdboで始まるすべてのテーブルを選択します。
    • dbo\.(test1|test2): dbo.test1dbo.test2を選択します。

    リスト内の両方の形式を組み合わせることができます。 エントリ全体の文字制限の合計は 102,400 文字です。

  5. スロット名 (省略可能): 特定のデータベース/スキーマの特定のプラグインからの変更をストリーミングするために作成された PostgreSQL 論理デコード スロットの名前を入力します。 サーバーはこのスロットを使用して、イベントを Eventstream ストリーミング コネクタにストリーミングします。 小文字、数字、アンダースコアのみを含める必要があります。

    • 指定しない場合は、適切なデータベースのアクセス許可を必要とする、スロットの作成に GUID が使用されます。
    • 指定されたスロット名が存在する場合、コネクタはそれを直接使用します。
  6. [ 詳細設定] を展開して、PostgreSQL データベース CDC ソースのその他の構成オプションにアクセスします。

    • パブリケーション名: 使用する PostgreSQL 論理レプリケーション パブリケーションの名前を指定します。 この値は、データベース内の既存のパブリケーションと一致する必要があります。または、自動作成モードに応じて自動的に作成されます。 既定値: dbz_publication

      コネクタ ユーザーは、パブリケーションを作成するためのスーパーユーザーのアクセス許可を持っている必要があります。 アクセス許可関連の問題を回避するために、コネクタを初めて起動する前に、パブリケーションを手動で作成することをお勧めします。

    • パブリケーションの自動作成モード: パブリケーションを自動的に作成するかどうかを制御します。 オプションは次のとおりです。

      • Filtered (既定値): 指定したパブリケーションが存在しない場合、コネクタは選択したテーブルのみを含むパブリケーションを作成します (テーブルインクルード リストで指定されています)。
      • AllTables: 指定したパブリケーションが存在する場合、コネクタはそれを使用します。 存在しない場合、コネクタはデータベース内のすべてのテーブルを含むテーブルを作成します。
      • Disabled: コネクタはパブリケーションを作成しません。 指定したパブリケーションがない場合、コネクタはエラーをスローして停止します。 この場合、パブリケーションをデータベースに手動で作成する必要があります。

      詳細については、パブリケーションの自動作成モードに関する Debezium ドキュメントを参照してください。

    • 10 進処理モード: コネクタが PostgreSQL DECIMALNUMERIC 列の値を処理する方法を指定します。

      • Precise: 正確な 10 進型 (Java BigDecimal など) を使用して値を表し、データ表現の完全な精度と精度を確保します。
      • Double: 値を倍精度浮動小数点数に変換します。 このオプションを選択すると、使いやすさとパフォーマンスが向上しますが、精度が低下する可能性があります。
      • String: 値を書式設定された文字列としてエンコードします。 このオプションを使用すると、ダウンストリーム システムで簡単に使用できますが、元の数値型に関するセマンティック情報は失われます。
    • スナップショット モード: コネクタの起動時にスナップショットを実行するための条件を指定します。

      • Initial: コネクタは、論理サーバー名のオフセットが記録されなかった場合、または以前のスナップショットが完了しなかったことを検出した場合にのみ、スナップショットを実行します。 スナップショットが完了すると、コネクタは後続のデータベース変更のイベント レコードのストリーミングを開始します。
      • InitialOnly: コネクタは、論理サーバー名のオフセットが記録されていない場合にのみスナップショットを実行します。 スナップショットが完了すると、コネクタは停止します。 binlog から変更イベントを読み取るためにストリーミングに移行することはありません。
      • NoData: コネクタは、スキーマのみをキャプチャするスナップショットを実行しますが、テーブル データはキャプチャしません。 データの一貫性のあるスナップショットは必要ないが、コネクタの起動後に発生する変更のみが必要な場合は、このオプションを設定します。
    • ハートビート アクション クエリ: コネクタがハートビート メッセージを送信するときに、ソース データベースでコネクタが実行するクエリを指定します。

    • スナップショット選択ステートメントのオーバーライド: スナップショットに含めるテーブル行を指定します。 スナップショットにテーブル内の行のサブセットのみを含める場合は、このプロパティを使用します。 このプロパティはスナップショットにのみ影響します。 コネクタがログから読み取るイベントには適用されません。

ストリームまたはソースの詳細

  1. [ 接続 ] ページで、Eventstream または Real-Time ハブのどちらを使用しているかに基づいて、次のいずれかの手順に従います。

    • Eventstream:

      右側の [ ソースの詳細 ] ウィンドウで、次の手順に従います。

      1. [ソース名] で、[鉛筆] ボタンを選択して名前を変更します。

      2. Eventstream 名Stream 名は読み取り専用であることに注意してください。

    • リアルタイムハブ:

      右側の [ストリームの詳細] セクションで、次の手順に従います。

      1. イベントストリームを作成するFabricワークスペースを選択します。

      2. "イベントストリーム名" の鉛筆ボタンを選択し、イベントストリームの名前を入力します。

      3. Stream 名の値は、eventstream の名前に -stream を追加することによって自動的に生成されます。 このストリームは、ウィザードの終了時にリアルタイム ハブの [すべてのデータ ストリーム ] ページに表示されます。

  2. [構成] ページの下部にある [ 次へ ] を選択 します。

確認して接続する

[ 確認と接続 ] 画面で概要を確認し、[ 追加] (Eventstream) または [接続 ] (Real-Time ハブ) を選択します。

スキーマ処理ページ

  1. スキーマ処理手順で、次のいずれかのオプションを選択します。

    • 分析対応イベントと自動更新スキーマ (DeltaFlow プレビュー):コネクタは、ソース テーブル構造をミラーリングする分析対応ストリームに未加工の CDC イベントを変換します。 DeltaFlow は、変更の種類 (挿入、更新、削除) やタイムスタンプなどのメタデータを使用してイベントを強化し、変換先テーブルとスキーマの進化を自動的に管理します。
    • 未加工の CDC イベント: コネクタは、生の CDC イベントを取り込んで使用できるようにします。 必要に応じて、コネクタはテーブル スキーマを自動検出し、スキーマ レジストリに登録できます。 DeltaFlow 変換を使用せずにスキーマ認識を行う場合は、このオプションを使用します。

    次のスクリーンショットは、CDC Azure SQL Database示しています。 スキーマ処理オプションは、サポートされているすべての CDC ソース コネクタで同じです。

    CDC ソース コネクタの DeltaFlow および Raw CDC イベント オプションを含むスキーマ処理手順を示すスクリーンショット。

  2. イベント スキーマの関連付けを有効にします。

  3. Workspace で、スキーマ セットのFabric ワークスペースを選択します。

  4. [ スキーマ セット] では、[ + 作成 ] が既定で選択され、新しいスキーマ セットが作成されます。 既存のイベント スキーマ セットを選択するように変更できます。

  5. 前の手順で [+ 作成 ] オプションを選択した場合は、スキーマ セットの名前を入力します。

  6. [ 確認と接続 ] ページで概要を確認し、[ 追加 ] (Eventstream) または [接続 ] (Real-Time ハブ) を選択します。

    拡張機能を備えた PostgreSQL データベース コネクタの [確認と作成] ページを示すスクリーンショット。

    PostgreSQL データベース内のすべてのテーブルまたは選択したテーブルに対して、コネクタによって自動検出が行われ、スキーマが作成され、スキーマ レジストリに登録されます。

DeltaFlow: Analytics 対応イベント変換 (プレビュー)

Analytics 対応イベントと自動更新スキーマ (DeltaFlow) を有効にすると、コネクタは次の機能を提供します。

  • 分析対応のイベント形状: 生の Debezium CDC イベントは、ソース テーブル構造を反映する表形式に変換されます。 イベントは、変更の種類 (insertupdate、または delete) やイベントタイムスタンプなどのメタデータ列でエンリッチされます。
  • 自動宛先テーブル管理: DeltaFlow 対応ストリームをイベントハウスなどのサポート対象の宛先にルーティングすると、変換先テーブルがソース テーブル スキーマと一致するように自動的に作成されます。 コピー先テーブルを手動で作成または構成する必要はありません。
  • スキーマの進化の処理: ソース データベース テーブルが変更されると (たとえば、新しい列が追加されたり、テーブルが作成されたりすると)、DeltaFlow は変更を自動的に検出し、登録されたスキーマを更新し、それに応じて変換先テーブルを調整します。 この動作により、スキーマの変更による手動による介入が最小限に抑えられます。

DeltaFlow (プレビュー) は現在、Azure SQL Database CDC、Azure SQL Managed Instance CDC、仮想マシン CDC のSQL Server、PostgreSQL CDC ソース コネクタでサポートされています。

DeltaFlow が未加工の CDC イベントを分析対応の出力 (操作の種類やメタデータ列など) に変換する方法の詳細については、「 DeltaFlow 出力変換」を参照してください。

更新されたイベントストリームを表示する

  1. [編集モード] でイベントストリームに追加された PostgreSQL データベースの CDC ソースを確認できます。

    拡張機能を含む編集ビューでの PostgreSQL DB CDC ソースのストリーミングのスクリーンショット。

  2. この新しく追加された PostgreSQL DB CDC ソースを実装するには、公開する を選択します。 これらの手順を完了すると、PostgreSQL DB CDC ソースを [ライブ ビュー] で視覚化できるようになります。

    拡張機能を含むライブ ビューでの PostgreSQL DB CDC ソースのストリーミングのスクリーンショット。

スキーマを使用するように Eventstream の宛先を構成する

現在、関連付けられたスキーマを持つ Eventstream では、Eventhouse、カスタム エンドポイント、および派生ストリーム変換先のみがサポートされています。 このセクションでは、イベントストリームに対して拡張機能 (スキーマ サポートなど) が有効になっている場合に Eventhouse 変換先を追加および構成する方法について説明します。

サポートされている変更データ キャプチャ (CDC) ソースで DeltaFlow (プレビュー) を使用すると、Eventhouse 内の変換先テーブルが自動的に作成され、ソース テーブルの構造と一致するように管理されます。 宛先テーブル スキーマを手動で構成する必要はありません。 DeltaFlow では、ソース テーブルが変更されたときにスキーマの進化も自動的に処理されます。

カスタム エンドポイント変換先のスキーマを構成する

  1. [ イベントの変換] または [宛先の追加] を選択し、[ CustomEndpoint] を選択します。

  2. [ カスタム エンドポイント ] ウィンドウで、宛先の名前を指定します。

  3. [入力スキーマ] で、イベントのスキーマを選択します。 このボックスでは、イベントストリームのスキーマサポートを有効にするときに選択します。

カスタム エンドポイントを構成するためのペインを示すスクリーンショット。

カスタム エンドポイントの送信先を構成する詳細な手順については、「 カスタム エンドポイントまたはカスタム アプリの宛先をイベントストリームに追加する」を参照してください。

イベントハウスの宛先に対するスキーマを設定する

  1. [ イベントの変換] または [宛先の追加] を選択し、[ Eventhouse] を選択します。

  2. [Eventhouse] ウィンドウで、次のスキーマ関連の設定を構成します。

    1. [入力スキーマ] で、ドロップダウン リストから 1 つ以上のスキーマを選択します。

      入力スキーマが選択されている eventhouse 構成ウィンドウを示すスクリーンショット。

      Event Hubs ソースを構成するときに [ヘッダーによる動的スキーマ ] オプションを選択した場合は、ソースに対して複数のスキーマを構成し、それらをさまざまなプロパティとその値にマップしている可能性があります。

    2. [テーブルの作成方法] では、要件に応じて、すべてのスキーマを結合した 1 つのテーブルを選択するか、スキーマごとに個別のテーブルを選択します。

      テーブルの作成方法を含む eventhouse 構成ウィンドウを示すスクリーンショット。

    3. [ データの書き込み方法] で、次のいずれかのオプションを選択します。

      • ペイロードのみ: 抽出されたペイロード データをテーブルに書き込みます。 複数の入力スキーマがある場合、データは複数のテーブルに送信されます。
      • メタデータとペイロード: メタデータとペイロード データを 1 つのテーブルに書き込みます。 列の例には、 sourcesubjecttype、および dataがあります。

      データを書き込むためのオプションを含む eventhouse 構成ウィンドウを示すスクリーンショット。

イベントハウスの宛先を構成する詳細な手順については、「 イベントストリームにイベントハウスの宛先を追加する」を参照してください。

DeltaFlow 分析対応の出力を表示する (プレビュー)

Analytics 対応イベントと自動更新スキーマ (DeltaFlow) を有効にした場合、変換先テーブルはソース データベース テーブルを反映する図形に自動的に作成されます。 各テーブルには、元の列と、変更の種類とタイムスタンプのメタデータ列が含まれます。

次のスクリーンショットは、CDC Azure SQL Database示しています。 DeltaFlow 変換先テーブルの出力は、サポートされているすべての CDC ソース コネクタで同じです。

分析対応の図形で DeltaFlow によって作成された Eventhouse 変換先テーブルを示すスクリーンショット。

未加工の Debezium CDC ペイロードを解析しなくても、Kusto クエリ言語 (KQL) またはその他の分析ツールを使用してこれらのテーブルに対してクエリを実行できます。

その他のコネクタ。