次の方法で共有


マッピング データ フローを使用して安全にデータを変換する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。

Azure Data Factoryを初めて使用する場合は、Azure Data Factory の概要を参照してください。

このチュートリアルでは、Data Factory ユーザー インターフェイス (UI) を使用して、Azure Data Lake Storage Gen2 ソースから Data Lake Storage Gen2 シンクにデータ をコピーして変換するパイプラインを作成します (選択したネットワークにのみアクセスできます) Data Factory ManagedVirtual Network。 このチュートリアルの構成パターンは、マッピング データ フローを使用してデータを変換するときに拡張することができます。

このチュートリアルでは、次の手順を実行します。

  • データ ファクトリを作成します。
  • データ フロー アクティビティが含まれるパイプラインを作成します。
  • 4 つの変換を使用して、マッピング データ フローを構築します。
  • パイプラインをテスト実行します。
  • データ フロー アクティビティを監視します。

前提条件

  • Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に、free Azure アカウントを作成します。
  • Azure ストレージ アカウント。 Data Lake Storageは、source および sink データ ストアとして使用します。 ストレージ アカウントをお持ちでない場合は、「Azure ストレージ アカウントを作成する」を参照してください。 ストレージ アカウントで、選択したネットワークからのアクセスのみが許可されていることを確認します。

このチュートリアルで変換するファイルは moviesDB.csvであり、このGitHubコンテンツ サイトにあります。 GitHubからファイルを取得するには、任意のテキスト エディターに内容をコピーして、.csv ファイルとしてローカルに保存します。 ストレージ アカウントにファイルをアップロードするには、Azure ポータルで BLOB をアップロードするを参照してください。 例では、sample-data という名前のコンテナーを参照します。

Data Factory の作成

この手順では、データ ファクトリを作成し、Data Factory UI を開いて、データ ファクトリにパイプラインを作成します。

  1. Microsoft Edgeまたは Google Chrome を開きます。 現在、Data Factory UI をサポートするのは、Microsoft Edgeブラウザーと Google Chrome Web ブラウザーのみです。

  2. 左側のメニューで、 [リソースの作成]>[分析]>[Data Factory] の順に選択します。

  3. [新しいデータ ファクトリ] ページで、 [名前] に「ADFTutorialDataFactory」と入力します。

    データ ファクトリの名前は "グローバルに一意" にする必要があります。 データ ファクトリの名前の値に関するエラー メッセージが表示された場合は、別の名前を入力してください (yournameADFTutorialDataFactory など)。 Data Factory アーティファクトの名前付け規則については、Data Factory の名前付け規則に関するページを参照してください。

  4. データ ファクトリを作成する Azure サブスクリプションを選択します。

  5. [リソース グループ] で、次の手順のいずれかを行います。

    • [Use existing (既存のものを使用)] を選択し、ドロップダウン リストから既存のリソース グループを選択します。
    • [新規作成] を選択し、リソース グループの名前を入力します。

    リソース グループの詳細については、「リソース グループを使用してAzure リソースを管理するを参照してください。

  6. [バージョン] で、 [V2] を選択します。

  7. [場所] で、データ ファクトリの場所を選択します。 サポートされている場所のみがドロップダウン リストに表示されます。 データ ファクトリで使用されるデータ ストア (Azure StorageやAzure SQL Databaseなど) とコンピューティング (Azure HDInsightなど) は、他のリージョンに存在できます。

  8. [作成] を選択します

  9. 作成が完了すると、その旨が通知センターに表示されます。 [リソースに移動] を選択して、 [Data Factory] ページに移動します。

  10. Open Azure Data Factory Studio を選択して、別のタブで Data Factory UI を起動します。

Data Factory マネージド Virtual NetworkでAzure IR を作成する

この手順では、Azure IR を作成し、Data Factory マネージド Virtual Networkを有効にします。

  1. Data Factory ポータルで、Manage に移動し、New を選択して新しいAzure IR を作成します。

    新しいAzure IRの作成を示すスクリーンショット

  2. [Integration runtime setup](統合ランタイムのセットアップ) ページで、必要な機能に基づいて作成する統合ランタイムを選択します。 このチュートリアルでは、Azure、セルフホステッド を選択し、Continue をクリックします。

  3. Azure を選択し、Continue をクリックして、Azure統合ランタイムを作成します。

    新しいAzure IRを示すスクリーンショットです。

  4. [仮想ネットワークの構成 (プレビュー)] で、 [有効化] を選択します。

    新しい Azure IR の有効化を示すスクリーンショット

  5. [作成] を選択します

データ フロー アクティビティが含まれるパイプラインの作成

この手順では、データ フロー アクティビティが含まれるパイプラインを作成します。

  1. Azure Data Factoryのホーム ページで、Orchestrate を選択します。

    [オーケストレーション] ボタンが強調表示されたデータ ファクトリのホームページを示すスクリーンショット。

  2. パイプラインの [プロパティ] ウィンドウで、パイプラインの名前として TransformMovies と入力します。

  3. [アクティビティ] ウィンドウで、 [移動と変換] を展開します。 Data Flow アクティビティをウィンドウからパイプライン キャンバスにドラッグします。

  4. 追加データフローポップアップで、新しいデータフローを作成を選択し、マッピングデータフローを選択します。 完了したら、 [OK] をクリックします。

    マッピング データ フローを示すスクリーンショット

  5. [プロパティ] ウィンドウで、データ フローに TransformMovies という名前を付けます。

  6. パイプライン キャンバスの上部バーで、Data Flow debug スライダーをスライドします。 デバッグ モードを使用すると、ライブ Spark クラスターに対する変換ロジックの対話型テストが可能になります。 Data Flow クラスターのウォームアップには 5 ~ 7 分かかります。ユーザーは、Data Flow開発を計画している場合は、最初にデバッグを有効にすることをお勧めします。 詳細については、デバッグ モードに関するページを参照してください。

    [データ フローのデバッグ] のスライダーを示すスクリーンショット。

データ フロー キャンバスでの変換ロジックの作成

データ フローを作成すると、データ フロー キャンバスに自動的に飛ばされます。 この手順では、Data Lake Storageで moviesDB.csv ファイルを取得し、1910 年から 2000 年までのコメディの平均評価を集計するデータ フローを構築します。 その後、このファイルをData Lake Storageに書き戻します。

ソース変換を追加する

この手順では、ソースとしてData Lake Storage Gen2を設定します。

  1. データ フロー キャンバスで [Add Source](ソースの追加) ボックスを選択して、ソースを追加します。

  2. ソースに MoviesDB という名前を付けます。 [新規] を選択して、新しいソース データセットを作成します。

  3. Azure Data Lake Storage Gen2 を選択し、Continue を選択します。

  4. [DelimitedText] を選択してから、 [続行] を選択します。

  5. データセットに MoviesDB という名前を付けます。 リンクされたサービスのドロップダウンで、 [新規] を選択します。

  6. リンクされたサービス作成画面で、Data Lake Storage Gen2リンクされたサービスに ADLSGen2 という名前を付け、認証方法を指定します。 次に、接続の資格情報を入力します。 このチュートリアルでは、アカウント キーを使用してストレージ アカウントに接続しています。

  7. [Interactive authoring](インタラクティブな作成) を必ず有効にしてください。 これは有効になるまでに 1 分程かかる場合があります。

    インタラクティブな作成を示すスクリーンショット。

  8. [接続テスト] を選択します。 プライベート エンドポイントを作成および承認しなければストレージ アカウントにアクセスできないようになっているため、これは失敗するはずです。 エラー メッセージ内に、プライベート エンドポイントを作成するためのリンクが表示されます。それをたどることで、マネージド プライベート エンドポイントを作成できます。 代わりに、 [管理] タブに直接移動し、こちらのセクションの指示に従って、マネージド プライベート エンドポイントを作成する方法もあります。

  9. ダイアログ ボックスは開いたままにして、ストレージ アカウントに移動します。

  10. このセクションの手順に従って、プライベート リンクを承認します。

  11. ダイアログ ボックスに戻ります。 もう一度 [接続テスト] を選択し、 [作成] を選択して、リンクされたサービスをデプロイします。

  12. データセットの作成画面で、 [ファイル パス] フィールドの下にファイルが配置されている場所を入力します。 このチュートリアルでは、moviesDB.csv ファイルはコンテナー sample-data に配置されています。 ファイルにはヘッダーが含まれているので、 [先頭の行を見出しとして使用] チェック ボックスをオンにします。 ストレージ内のファイルからヘッダー スキーマを直接インポートするには、 [From connection/store](接続/ストアから) を選択します。 完了したら、 [OK] をクリックします。

    ソース パスを示すスクリーンショット。

  13. デバッグ クラスターが起動している場合は、ソース変換の [データのプレビュー] タブに移動し、 [更新] を選択して、データのスナップショットを取得します。 データ プレビューを使用すると、変換が正しく構成されていることを確認できます。

    [データのプレビュー] タブを示すスクリーンショット。

マネージド プライベート エンドポイントを作成する

前述の接続をテストした際にハイパーリンクを使用しなかった場合は、パスに従います。 次に、作成したリンクされたサービスに接続するマネージド プライベート エンドポイントを作成する必要があります。

  1. [管理] タブに移動します。

    注意

    Data Factory インスタンスでは、そのすべてで [管理] タブを使用できない場合があります。 表示されない場合は、 [作成者]>[接続]>[プライベート エンドポイント] を選択して、プライベート エンドポイントにアクセスできます。

  2. [マネージド プライベート エンドポイント] セクションに移動します。

  3. [マネージド プライベート エンドポイント] で、 [+ 新規] を選択します。

    [マネージド プライベート エンドポイント] の [新規] ボタンを示すスクリーンショット。

  4. 一覧から Azure Data Lake Storage Gen2 タイルを選択し、Continue を選択します。

  5. 作成したストレージ アカウントの名前を入力します。

  6. [作成] を選択します

  7. 数秒後に、作成されたプライベート リンクに承認が必要であることが表示されます。

  8. 作成したプライベート エンドポイントを選択します。 ストレージ アカウント レベルでプライベート エンドポイントを承認できるハイパーリンクが表示されます。

    [マネージド プライベート エンドポイント] ウィンドウを示すスクリーンショット。

  1. ストレージ アカウントで、 [設定] セクションの [プライベート エンドポイント接続] に移動します。

  2. 作成したプライベート エンドポイントのチェック ボックスをオンにし、 [承認] を選択します。

    プライベート エンドポイントの [承認] ボタンを示すスクリーンショット。

  3. 説明を追加し、 [はい] を選択します。

  4. Data Factory の [管理] タブにある [マネージド プライベート エンドポイント] セクションに戻ります。

  5. 約 1 分後に、プライベート エンドポイントの承認の旨が表示されます。

フィルター変換を追加する

  1. データ フロー キャンバス上のソース ノードの横にあるプラス アイコンを選択して、新しい変換を追加します。 最初に追加する変換は、 [フィルター] です。

    フィルターの追加を示すスクリーンショット。

  2. フィルター変換に FilterYears という名前を付けます。 [フィルター適用] の横にある式ボックスを選択して、式ビルダーを開きます。 ここでフィルター条件を指定します。

    FilterYears を示すスクリーンショット。

  3. データ フローの式ビルダーでは、さまざまな変換で使用する式を対話形式で作成できます。 式には、組み込み関数、入力スキーマの列、ユーザー定義のパラメーターを含めることができます。 式の作成方法の詳細については、データ フローの式ビルダーに関するページを参照してください。

    • このチュートリアルでは、1910 年から 2000 年の間に公開された、ジャンルがコメディの映画をフィルター処理します。 現在、年は文字列になっているため、toInteger() 関数を使用して整数に変換する必要があります。 以上演算子 (>=) と以下演算子 (<=) を使って、年のリテラル値 1910 と 2000 に対する比較を行います。 これらの式を and (&&) 演算子を使用して結合します。 式は次のようになります。

      toInteger(year) >= 1910 && toInteger(year) <= 2000

    • コメディ映画を見つけるには、rlike() 関数を使用して、ジャンル列でパターン 'Comedy' を検索します。 rlike 式を年の比較と結合すると、次の式が得られます。

      toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')

    • デバッグ クラスターがアクティブになっている場合は、 [更新] を選択して使用された入力と比較した式の出力を表示して、ロジックを確認できます。 データ フローの式言語を使用してこのロジックを実現する方法に対する正解は複数あります。

      フィルター式を示すスクリーンショット。

    • 式の編集を完了したら、「保存して終了」 を選択してください。

  4. フィルターが正しく機能していることを確認するには、データ プレビューをフェッチします。

    フィルター処理されたデータのプレビューを示すスクリーンショット。

集計変換を追加する

  1. 次に追加する変換は、 [Schema modifier](スキーマ修飾子) の下にある [集計] 変換です。

    集計の追加を示すスクリーンショット。

  2. 集計変換に AggregateComedyRating という名前を付けます。 [グループ化] タブで、ドロップダウン ボックスから [year] を選択し、映画の公開年ごとに集計をグループ化します。

    集計グループを示すスクリーンショット。

  3. [集計] タブに移動します。左側のテキスト ボックスで、集計列に AverageComedyRating という名前を付けます。 式ビルダーを使用して集計式を入力するには、右側の式ボックスを選択します。

    集計列の名前を示すスクリーンショット。

  4. [Rating] の平均値を取得するには、avg() 集計関数を使用します。 [Rating] は文字列で、avg() で受け入れられるのは数値入力なので、toInteger() 関数を使用して値を数値に変換する必要があります。 表現は次のようになります。

    avg(toInteger(Rating))

  5. 完了したら、 [Save and Finish](保存して終了する) を選択します。

    集計の保存を示すスクリーンショット。

  6. 変換出力を表示するには、 [Data Preview](データのプレビュー) タブに移動します。 yearAverageComedyRating の 2 つの列だけがあることに注目してください。

シンク変換を追加する

  1. 次に、 [Destination](変換先) の下で [シンク] 変換を追加します。

    シンクの追加を示すスクリーンショット。

  2. シンクに Sink という名前を付けます。 [新規] を選択して、シンク データセットを作成します。

    シンクの作成を示すスクリーンショット。

  3. 新しいデータセット ページで、Azure Data Lake Storage Gen2 を選択し、 Continue を選択します。

  4. [形式の選択] ページで、 [DelimitedText] を選択してから、 [続行] を選択します。

  5. シンク データセットに MoviesSink という名前を付けます。 リンクされたサービスでは、ソース変換用に作成したものと同じ ADLSGen2 のリンクされたサービスを選択します。 データの書き込み先となる出力フォルダーを入力します。 このチュートリアルでは、コンテナー sample-data 内のフォルダー output に書き込んでいます。 フォルダーは、事前に存在している必要はなく、動的に作成することができます。 [先頭の行を見出しとして使用] チェック ボックスを選択し、 [スキーマのインポート][なし] を選択します。 [OK] を選択します。

    シンク パスを示すスクリーンショット。

これで、データ フローの構築が完了しました。 これをパイプラインで実行する準備ができました。

データ フローの実行と監視

パイプラインを発行する前にデバッグすることができます。 この手順では、データ フロー パイプラインのデバッグ実行をトリガーします。 データのプレビューではデータが書き込まれませんが、デバッグ実行ではシンクの変換先にデータが書き込まれます。

  1. パイプライン キャンバスへ移動してください。 [デバッグ] を選択して、デバッグ実行をトリガーします。

  2. データ フロー アクティビティのパイプライン デバッグでは、アクティブなデバッグ クラスターが使用されますが、それでも初期化には少なくとも 1 分かかります。 進行状況は [出力] タブで追跡することができます。実行が正常に完了したら、眼鏡のアイコンを選択して実行の詳細を確認します。

  3. 詳細ページには、各変換手順で使用した行の数と所要時間が表示されます。

    監視実行を示すスクリーンショット。

  4. 変換を選択すると、データの列とパーティション分割に関する詳細情報が表示されます。

このチュートリアルに正しく従った場合は、シンク フォルダーに 83 行と 2 列が書き込まれているはずです。 BLOB ストレージをチェックすることで、データが正しいことを確認できます。

まとめ

このチュートリアルでは、Data Factory UI を使用して、Data Factory Managed Virtual Network のマッピング データ フローを使用して、Data Lake Storage Gen2 ソースからData Lake Storage Gen2 シンクにデータをコピーして変換するパイプラインを作成しました (両方とも、選択したネットワークへのアクセスのみを許可します)。