MySQL データベース CDC ソースをイベントストリームに追加する

この記事では、MySQL Change Data Capture ソースをイベントストリームに追加する方法について説明します。

Azure MySQL Database Change Data Capture (CDC) コネクタを使用すると、Azure MySQL データベース内の現在のデータのスナップショットをキャプチャできます。 監視対象のテーブルを指定し、それらのテーブルの行レベルで変更があった場合にアラートを受け取ります。 変更がストリームにキャプチャされたら、この CDC データをリアルタイムで処理し、Fabric内のさまざまな宛先に送信して、さらに処理または分析することができます。

現在、MySQL Database CDC は、データベースにパブリックにアクセスできる次のサービスからサポートされています。

  • Azure Database for MySQL
  • Amazon RDS for MySQL
  • Amazon Aurora MySQL
  • Google Cloud SQL for MySQL (GCP)

このガイドでは、例として Azure Database for MySQL CDC を使用します。

MySQL データベース CDC ソースがイベントストリームに追加されると、指定したテーブルに対する行レベルの変更がキャプチャされます。 その後、これらの変更をリアルタイムで処理し、別の宛先に送信して詳細な分析を行うことができます。

前提条件

  • 共同作成者以上のアクセス許可を持つ、Fabric容量ライセンス モード (または試用版ライセンス モード) のワークスペースへのアクセス。
  • MySQL データベースのインスタンス (Azure Database for MySQL - フレキシブル サーバー内のデータベースなど) へのアクセス。
  • MySQL データベースにはパブリックにアクセスでき、ファイアウォールの内側に置いたり、仮想ネットワークでセキュリティで保護したりしないでください。 保護されたネットワークに存在する場合は、 Eventstream コネクタの仮想ネットワークインジェクションを使用して接続します。
  • イベントストリームがない場合は、イベントストリームを作成します。

MySQL データベースを設定する

コネクタは Debezium MySQL コネクタを使用して、MySQL データベースの変更をキャプチャします。 Messaging Connector が変更をキャプチャできるすべてのデータベースに対して適切な権限を持つ MySQL ユーザーを定義する必要があります。 管理者ユーザーを直接使用して、通常は適切な特権を持つデータベースに接続することも、次の手順に従って新しいユーザーを作成することもできます。

新しいユーザーまたは管理者アカウントと対応するパスワードは、後で Eventstream 内のデータベースに接続するために使用されます。

  1. コマンド プロンプトで mysql 、MySQL ユーザーを作成します。

    mysql> CREATE USER 'user'@'%' IDENTIFIED BY 'password';
    
  2. 必要な権限をユーザーに付与します。

    mysql> GRANT SELECT, SHOW DATABASES, REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'user'@'%';
    

    Amazon Relational Database Service (RDS) や Aurora などのホストされているオプションと同様に、グローバル読み取りロックが使用できない場合は、テーブル レベルのロックを使用して一貫性のあるスナップショットが作成されます。 この場合は、ユーザーに LOCK TABLES アクセス許可を付与する必要があります。 さらに、スナップショット中の FLUSH 操作をサポートするために、 RELOAD または FLUSH_TABLES 権限を付与する必要がある場合もあります。

  3. ユーザーのアクセス許可を最終処理します。

    mysql> FLUSH PRIVILEGES;
    

ユーザーまたは管理者に必要な特権が付与されているかどうかを確認するには、次のコマンドを実行し、手順 2 で必要な特権を表示する必要があります。

SHOW GRANTS FOR user;

必要なアクセス許可をユーザーに付与する方法の詳細については、「MySQL 用 Debezium コネクタ: Debezium ドキュメント」を参照してください。

binlog を有効にする

MySQL レプリケーションのバイナリ ログを有効にする必要があります。 バイナリ ログには、変更を反映するためのレプリケーション ツールのトランザクションの更新が記録されます。 このセクションでは、構成手順を示す例として、Azure Database for MySQL CDC を使用します。

  1. Azure Database for MySQL アカウントのAzure ポータル ページで、左側のナビゲーションの Settings の下にある Server parameters を選択します。

  2. プロジェクト パラメーター ページで、次の設定を完了し、保存を選択します。

    • binlog_row_imageの場合は、[完全] を選択します

    • binlog_expire_logs_secondsの場合は、バイナリ ログ ファイルが消去されるまでにサービスが待機する秒数を設定します。 環境のニーズに合わせて値を設定します。例えば、86400 などです。

    サーバー パラメーターの下のレプリケーションの binlog 設定のスクリーンショット。

MySQL データベース (DB) 変更データ キャプチャ (CDC) をソースとして追加する

まだ eventstream にソースを追加していない場合は、[ データ ソースの接続 ] タイルを選択します。 リボンの [ ソースの追加>データ ソースの接続 ] を選択することもできます。

外部ソースを使用するためのタイルの選択を示すスクリーンショット。

既に公開されているイベントストリームにソースを追加する場合は、 編集 モードに切り替えます。 リボンで、[ソースの追加] >[データ ソースの接続] を選択します。

外部ソースを追加するための選択を示すスクリーンショット。

[データ ソースの選択] ページにおいて、[MySQL DB (CDC)] タイルで [接続] を探して選択します。

イベントの取得ウィザードでのソースの種類としての MySQL DB (CDC) の選択を示すスクリーンショット。

MySQL DB (CDC) の構成と接続

  1. [接続] 画面の [接続] で、[新しい接続] を選択してクラウド接続を作成します。

    [接続] ページを示すスクリーンショット。

  2. MySQL データベースの次の 接続設定接続資格情報 を入力し、[ 接続] を選択します。

    • サーバー: MySQL データベースのサーバー アドレス (例: my-mysql-server.mysql.database.azure.com)。

    • データベース: データベース名 (my_database など)。

    • 接続名: 自動生成されるか、この接続の新しい名前を入力できます。

    • UsernamePassword: MySQL データベースの資格情報を入力します。 サーバー管理者アカウントまたは必要な権限を付与して作成されたユーザー アカウントを入力してください。

      Azure MySQL Database (DB) Change Data Capture (CDC)の接続設定のスクリーンショット。

  3. MySQL DB CDC データ ソースを構成するには、次の情報を入力し、[次へ]選択します。

    • ポート: 既定値は 3306 です。 選択したクラウド接続が [接続とゲートウェイの管理] で構成されている場合は、ポート番号がそこに設定されているポート番号と一致していることを確認します。 一致しない場合は、[ 接続とゲートウェイの管理 ] のクラウド接続のポート番号が優先されます。

    • テーブル: [すべてのテーブル ] または [テーブル名を入力] を選択します。 後者を選択する場合は、完全なテーブル識別子 (databaseName.tableName) または有効な正規表現のコンマ区切りの一覧を使用してテーブルを指定します。 例えば次が挙げられます。

      • databaseName.test.*を使用して、名前が databaseName.test で始まるすべてのテーブルを選択します。
      • databaseName\.(test1|test2)を使用して、databaseName.test1databaseName.test2を選択します。

      両方の形式をコンマで混在させることができます。 エントリ全体の文字制限の合計は 102,400 文字です。

    • サーバー ID: MySQL クラスター内のサーバーとレプリケーション クライアントごとに一意の値を入力します。 既定値は 1000 です。

    リーダーごとに異なるサーバー ID を設定します。 binlog を読み取るすべての MySQL データベース クライアントには、サーバー ID と呼ばれる一意の ID が必要です。 MySQL サーバーはこの ID を使用して、ネットワーク接続と Binlog のポジションを維持します。 同じサーバー ID を共有するジョブが異なると、間違った binlog 位置から読み取る可能性があります。 そのため、リーダーごとに異なるサーバー ID を設定することをお勧めします。

  4. [詳細設定] を展開すると、MySQL データベース CDC ソースのその他の構成オプションにアクセスできます。

    • スナップショット ロック モード: オプションは次のとおりです。
      • Minimal (default): スキーマとメタデータをキャプチャするために、初期フェーズ中にのみグローバル読み取りロックを保持します。 スナップショットの残りの部分では REPEATABLE READ トランザクションが使用され、データの読み取り中に更新が許可されます。
      • Extended: スナップショット期間全体のグローバル読み取りロックを維持し、すべての書き込みをブロックします。 書き込みブロックが許容される場合は、完全な一貫性を確保するために使用します。
      • None: スナップショット中のテーブル ロックの取得をスキップします。 プロセス中にスキーマの変更が発生しない場合にのみ安全です。
    • 10 進処理モード: コネクタが DECIMALNUMERIC 列の値を処理する方法を指定します。
      • Precise: 正確な 10 進型 (Java BigDecimal など) を使用して値を表し、データ表現の完全な精度と精度を確保します。
      • Double: 値を倍精度浮動小数点数に変換します。 このオプションを選択すると、使いやすさとパフォーマンスが向上しますが、精度が低下する可能性があります。
      • String: 値を書式設定された文字列としてエンコードします。 このオプションを使用すると、ダウンストリーム システムで簡単に使用できますが、元の数値型に関するセマンティック情報は失われます。
    • スナップショット モード: コネクタの起動時にスナップショットを実行するための条件を指定します。
      • Initial: コネクタは、論理サーバー名のオフセットが記録されていない場合、または以前のスナップショットが完了しなかったことを検出した場合にのみ、スナップショットを実行します。 スナップショットが完了すると、コネクタは後続のデータベース変更のイベント レコードのストリーミングを開始します。
      • InitialOnly: コネクタは、論理サーバー名のオフセットが記録されていない場合にのみスナップショットを実行します。 スナップショットが完了すると、コネクタは停止します。 binlog から変更イベントを読み取るためにストリーミングに移行することはありません。
      • NoData: コネクタは、スキーマのみをキャプチャするスナップショットを実行しますが、テーブル データはキャプチャしません。 データの一貫性のあるスナップショットは必要ないが、コネクタの起動後に発生する変更のみが必要な場合は、このオプションを設定します。

ストリームまたはソースの詳細

  1. [ 接続 ] ページで、Eventstream または Real-Time ハブのどちらを使用しているかに基づいて、次のいずれかの手順に従います。

    • Eventstream:

      右側の [ ソースの詳細 ] ウィンドウで、次の手順に従います。

      1. [ソース名] で、[鉛筆] ボタンを選択して名前を変更します。

      2. Eventstream 名Stream 名は読み取り専用であることに注意してください。

    • リアルタイムハブ:

      右側の [ストリームの詳細] セクションで、次の手順に従います。

      1. イベントストリームを作成するFabricワークスペースを選択します。

      2. "イベントストリーム名" の鉛筆ボタンを選択し、イベントストリームの名前を入力します。

      3. Stream 名の値は、eventstream の名前に -stream を追加することによって自動的に生成されます。 このストリームは、ウィザードの終了時にリアルタイム ハブの [すべてのデータ ストリーム ] ページに表示されます。

  2. [構成] ページの下部にある [ 次へ ] を選択 します。

確認して接続する

[ 確認と接続 ] 画面で概要を確認し、[ 追加] (Eventstream) または [接続 ] (Real-Time ハブ) を選択します。

更新されたイベントストリームを表示する

  1. MySQL DB (CDC) ソースが、編集モードのでイベントストリームに追加されていることがわかります。

    [発行] ボタンが強調表示された編集モードで追加された Azure mySQL DB CDC ソースのスクリーンショット。

  2. [発行] を選択して変更を発行し、イベントストリームへの MySQL DB CDC データのストリーミングを開始します。

    LiveモードのAzure MySQL DB CDCソースを追加したスクリーンショット。

その他のコネクタ。