この記事では、PostgreSQL データベース変更データ キャプチャ (CDC) を Fabric Real-Time ハブのイベント ソースとして追加する方法について説明します。
Microsoft Fabric イベント ストリームの PostgreSQL データベース 変更 データ キャプチャ (CDC) ソース コネクタは、PostgreSQL データベースの現在のデータのスナップショットをキャプチャすることができます。 現在、PostgreSQL データベース変更データ キャプチャ (CDC) は、データベースにパブリックにアクセスできる次のサービスからサポートされています。
- Azure Database for PostgreSQL
- Amazon RDS for PostgreSQL
- Amazon Aurora PostgreSQL
- Google Cloud SQL for PostgreSQL
PostgreSQL データベース CDC ソースがイベントストリームに追加されると、指定したテーブルに対する行レベルの変更がキャプチャされます。 その後、これらの変更をリアルタイムで処理し、別の宛先に送信して詳細な分析を行うことができます。
注
DeltaFlow (プレビュー) を使用すると、未加工の Debezium CDC イベントを、ソース テーブル構造をミラーリングする分析対応ストリームに変換できます。 DeltaFlow を使用すると、スキーマの登録、宛先テーブルの管理、スキーマの進化の処理が自動化されます。 DeltaFlow を使用するには、スキーマ処理手順中に [Analytics 対応イベント] と [スキーマの自動更新 ] を選択します。
Prerequisites
共同作成者以上のアクセス許可を持つ、Fabric 容量ライセンス モード (または試用版ライセンス モード) のワークスペースへのアクセス。
PostgreSQLデータベースへの登録済みユーザーアクセス。
PostgreSQL データベースにはパブリックにアクセスでき、ファイアウォールの内側に置いたり、仮想ネットワークでセキュリティで保護したりしないでください。 保護されたネットワークに存在する場合は、 Eventstream コネクタの仮想ネットワークインジェクションを使用して接続します。
PostgreSQL データベースとテーブルで CDC が有効になっている。
PostgreSQL 用の Azure データベースがある場合は、次のセクションの手順に従って CDC を有効にします。 詳細の情報については「論理レプリケーションと論理デコード - PostgreSQL 用 Azure データベース - フレキシブル サーバー」を参照してください。
その他の PostgreSQL データベースについては「PostgreSQL 用の Debezium コネクタ :: Debezium のドキュメント」を参照してください。
イベントストリームがない場合は、イベントストリームを作成します。
PostgreSQL データベースで CDC を有効にする
このセクションでは、 Azure Database for PostgreSQL を例として使用します。
Azure Database for PostgreSQL フレキシブル サーバーで CDC を有効にするには、次の手順に従います。
Azure portal の Azure Database for PostgreSQL フレキシブル サーバー ページで、ナビゲーション メニューの [サーバー パラメーター] を選択します。
サーバーパラメータページで
- wal_levelをlogicalに設定します。
- max_worker_processesを少なくとも16 に更新します。
変更点を保存し、サーバーを再起動します。
Azure Database for PostgreSQL フレキシブル サーバー インスタンスがパブリック ネットワーク トラフィックを許可していることを確認します。
次の SQL ステートメントを実行して、管理者 ユーザーにレプリケーションアクセス許可権限を許可します。 他のユーザー アカウントを使用して PostgreSQL データベース (DB) を接続して CDC をフェッチする場合は、ユーザーが テーブル所有者であることを確認します。
ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
[データ ソース] ページに移動する
Microsoft Fabric にサインインします。
ページの左下に [Power BI] が表示されている場合は、[Power BI] を選択してから [Fabric] を選択することで Fabric ワークロードに切り替えます。
左側のナビゲーション バーで [リアルタイム] を選択します。
[ ストリーミング データ ] ページが既定で開きます。 [データの 追加 ] ボタンをクリックして、[ データ ソース ] ページに移動します。
左側のナビゲーション バーの [データの追加] オプションを選択して、[データ ソース] ページに直接アクセスすることもできます。
ソースの種類として PostgreSQL データベース CDC を選択する
[データ ソース] ページで、上部にある Microsoft ソース カテゴリを選択し、Azure DB for PostgreSQL (CDC) タイルで [接続] を選択します。
Azure Database for PostgreSQL CDC ソースの構成
CDC を介して Eventstream にテーブル スキーマを自動登録して PostgreSQL データベースから変更データを取り込みます。
注
DeltaFlow (プレビュー):スキーマ処理手順で Analytics 対応イベントと自動更新されたスキーマ を選択すると、DeltaFlow は未加工の Debezium CDC イベントを、ソース テーブル構造をミラーリングする分析対応ストリームに変換します。 DeltaFlow では、変換先テーブルの作成とスキーマの進化処理も自動化されます。
接続ページで、新しい接続を選択します。
接続設定セクションで、次の情報を入力します。
サーバー: PostgreSQL データベースのサーバー アドレス (my-pgsql-server.postgres.database.azure.com など)。
データベース: データベース名 (my_database など)。
接続名: 接続の名前を入力します。
[認証の種類] で [ 基本 ] を選択し、データベースの ユーザー名 と パスワード を入力します。
注
現時点では、Fabric イベント ストリームでは 基本 認証のみがサポートされています。
[ 接続 ] を選択して接続設定を完了します。
ポート: サーバーのポート番号を入力します。 既定値は 5432 です。 選択したクラウド接続が [接続とゲートウェイの管理] で構成されている場合は、ポート番号がそこに設定されているポート番号と一致していることを確認します。 一致しない場合は、[ 接続とゲートウェイの管理 ] のクラウド接続のポート番号が優先されます。
データベース テーブルから変更をキャプチャする場合は、次の 2 つのオプションから選択できます。
- すべてのテーブル: データベース内のすべてのテーブルからの変更をキャプチャします。
-
テーブル名の入力: コンマ区切りリストを使用して、テーブルのサブセットを指定できます。
schemaName.tableName形式の完全なテーブル識別子または有効な正規表現のいずれかを使用できます。 例: -
dbo.test.*:testスキーマで名前がdboで始まるすべてのテーブルを選択します。 -
dbo\.(test1|test2):dbo.test1とdbo.test2を選択します。
リスト内の両方の形式を組み合わせることができます。 エントリ全体の文字制限の合計は 102,400 文字です。
スロット名 (省略可能): 特定のデータベース/スキーマの特定のプラグインからの変更をストリーミングするために作成された PostgreSQL 論理デコード スロットの名前を入力します。 サーバーはこのスロットを使用して、イベントを Eventstream ストリーミング コネクタにストリーミングします。 小文字、数字、アンダースコアのみを含める必要があります。
- 指定しない場合は、適切なデータベースのアクセス許可を必要とする、スロットの作成に GUID が使用されます。
- 指定されたスロット名が存在する場合、コネクタはそれを直接使用します。
[ 詳細設定] を展開して、PostgreSQL データベース CDC ソースのその他の構成オプションにアクセスします。
パブリケーション名: 使用する PostgreSQL 論理レプリケーション パブリケーションの名前を指定します。 この値は、データベース内の既存のパブリケーションと一致する必要があります。または、自動作成モードに応じて自動的に作成されます。 既定値:
dbz_publication。注
コネクタ ユーザーは、パブリケーションを作成するためのスーパーユーザーのアクセス許可を持っている必要があります。 アクセス許可関連の問題を回避するために、コネクタを初めて起動する前に、パブリケーションを手動で作成することをお勧めします。
パブリケーションの自動作成モード: パブリケーションを自動的に作成するかどうかを制御します。 オプションは次のとおりです。
-
Filtered(既定値): 指定したパブリケーションが存在しない場合、コネクタは選択したテーブルのみを含むパブリケーションを作成します (テーブルインクルード リストで指定されています)。 -
AllTables: 指定したパブリケーションが存在する場合、コネクタはそれを使用します。 存在しない場合、コネクタはデータベース内のすべてのテーブルを含むテーブルを作成します。 -
Disabled: コネクタはパブリケーションを作成しません。 指定したパブリケーションがない場合、コネクタはエラーをスローして停止します。 この場合、パブリケーションをデータベースに手動で作成する必要があります。
詳細については、パブリケーションの自動作成モードに関する Debezium ドキュメントを参照してください。
-
10 進処理モード: コネクタが PostgreSQL
DECIMALとNUMERIC列の値を処理する方法を指定します。-
Precise: 正確な 10 進型 (JavaBigDecimalなど) を使用して値を表し、データ表現の完全な精度と精度を確保します。 -
Double: 値を倍精度浮動小数点数に変換します。 このオプションを選択すると、使いやすさとパフォーマンスが向上しますが、精度が低下する可能性があります。 -
String: 値を書式設定された文字列としてエンコードします。 このオプションを使用すると、ダウンストリーム システムで簡単に使用できますが、元の数値型に関するセマンティック情報は失われます。
-
スナップショット モード: コネクタの起動時にスナップショットを実行するための条件を指定します。
-
Initial: コネクタは、論理サーバー名のオフセットが記録されなかった場合、または以前のスナップショットが完了しなかったことを検出した場合にのみ、スナップショットを実行します。 スナップショットが完了すると、コネクタは後続のデータベース変更のイベント レコードのストリーミングを開始します。 -
InitialOnly: コネクタは、論理サーバー名のオフセットが記録されていない場合にのみスナップショットを実行します。 スナップショットが完了すると、コネクタは停止します。 binlog から変更イベントを読み取るためにストリーミングに移行することはありません。 -
NoData: コネクタは、スキーマのみをキャプチャするスナップショットを実行しますが、テーブル データはキャプチャしません。 データの一貫性のあるスナップショットは必要ないが、コネクタの起動後に発生する変更のみが必要な場合は、このオプションを設定します。
-
ハートビート アクション クエリ: コネクタがハートビート メッセージを送信するときに、ソース データベースでコネクタが実行するクエリを指定します。
スナップショット選択ステートメントのオーバーライド: スナップショットに含めるテーブル行を指定します。 スナップショットにテーブル内の行のサブセットのみを含める場合は、このプロパティを使用します。 このプロパティはスナップショットにのみ影響します。 コネクタがログから読み取るイベントには適用されません。
ストリームまたはソースの詳細
[ 接続 ] ページで、Eventstream または Real-Time ハブのどちらを使用しているかに基づいて、次のいずれかの手順に従います。
Eventstream:
右側の [ ソースの詳細 ] ウィンドウで、次の手順に従います。
[ソース名] で、[鉛筆] ボタンを選択して名前を変更します。
Eventstream 名と Stream 名は読み取り専用であることに注意してください。
リアルタイムハブ:
右側の [ストリームの詳細] セクションで、次の手順に従います。
イベントストリームを作成する Fabric ワークスペース を選択します。
"イベントストリーム名" の鉛筆ボタンを選択し、イベントストリームの名前を入力します。
Stream 名の値は、eventstream の名前に -stream を追加することによって自動的に生成されます。 このストリームは、ウィザードの終了時にリアルタイム ハブの [すべてのデータ ストリーム ] ページに表示されます。
[構成] ページの下部にある [ 次へ ] を選択 します。
確認して接続する
[ 確認と接続 ] 画面で概要を確認し、[ 追加] (Eventstream) または [接続 ] (Real-Time ハブ) を選択します。
スキーマ処理ページ
スキーマ処理手順で、次のいずれかのオプションを選択します。
- 分析対応イベントと自動更新スキーマ (DeltaFlow プレビュー):コネクタは、ソース テーブル構造をミラーリングする分析対応ストリームに未加工の CDC イベントを変換します。 DeltaFlow は、変更の種類 (挿入、更新、削除) やタイムスタンプなどのメタデータを使用してイベントを強化し、変換先テーブルとスキーマの進化を自動的に管理します。
- 未加工の CDC イベント: コネクタは、生の CDC イベントを取り込んで使用できるようにします。 必要に応じて、コネクタはテーブル スキーマを自動検出し、スキーマ レジストリに登録できます。 DeltaFlow 変換を使用せずにスキーマ認識を行う場合は、このオプションを使用します。
注
次のスクリーンショットは、Azure SQL Database CDC を示しています。 スキーマ処理オプションは、サポートされているすべての CDC ソース コネクタで同じです。
イベント スキーマの関連付けを有効にします。
[ワークスペース] で、スキーマ セットのファブリック ワークスペースを選択します。
[ スキーマ セット] では、[ + 作成 ] が既定で選択され、新しいスキーマ セットが作成されます。 既存のイベント スキーマ セットを選択するように変更できます。
前の手順で [+ 作成 ] オプションを選択した場合は、スキーマ セットの名前を入力します。
[ 確認と接続 ] ページで概要を確認し、[ 追加 ] (Eventstream) または [接続 ] (Real-Time ハブ) を選択します。
PostgreSQL データベース内のすべてのテーブルまたは選択したテーブルに対して、コネクタによって自動検出が行われ、スキーマが作成され、スキーマ レジストリに登録されます。
DeltaFlow: Analytics 対応イベント変換 (プレビュー)
Analytics 対応イベントと自動更新スキーマ (DeltaFlow) を有効にすると、コネクタは次の機能を提供します。
-
分析対応のイベント形状: 生の Debezium CDC イベントは、ソース テーブル構造を反映する表形式に変換されます。 イベントは、変更の種類 (
insert、update、またはdelete) やイベントタイムスタンプなどのメタデータ列でエンリッチされます。 - 自動宛先テーブル管理: DeltaFlow 対応ストリームをイベントハウスなどのサポート対象の宛先にルーティングすると、変換先テーブルがソース テーブル スキーマと一致するように自動的に作成されます。 コピー先テーブルを手動で作成または構成する必要はありません。
- スキーマの進化の処理: ソース データベース テーブルが変更されると (たとえば、新しい列が追加されたり、テーブルが作成されたりすると)、DeltaFlow は変更を自動的に検出し、登録されたスキーマを更新し、それに応じて変換先テーブルを調整します。 この動作により、スキーマの変更による手動による介入が最小限に抑えられます。
注
DeltaFlow (プレビュー) は現在、Azure SQL Database CDC、Azure SQL Managed Instance CDC、仮想マシン CDC 上の SQL Server、PostgreSQL CDC ソース コネクタでサポートされています。
DeltaFlow が未加工の CDC イベントを分析対応の出力 (操作の種類やメタデータ列など) に変換する方法の詳細については、「 DeltaFlow 出力変換」を参照してください。
データ ストリームの詳細を表示する
[ 確認と接続 ] ページで [ イベントストリームを開く] を選択すると、ウィザードによって、選択した PostgreSQL データベース CDC をソースとして作成したイベントストリームが開きます。 ウィザードを閉じるには、ページの下部にある [完了] を選択します。
Real-Time ハブのホーム ページの [最近のストリーミング データ] セクションにストリームが表示されます。 詳細な手順については、「Fabric リアルタイム ハブでのデータ ストリームの詳細の表示」を参照してください。
関連コンテンツ
データ ストリームの使用について詳しくは、次の記事を参照してください。