適用対象:
Azure Data Factory
Azure Synapse Analytics
ヒント
Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。
Azure Data Factoryを初めて使用する場合は、Azure Data Factory の概要を参照してください。
このチュートリアルでは、Data Factory ユーザー インターフェイス (UI) を使用して、Azure Data Lake Storage Gen2 ソースから Data Lake Storage Gen2 シンクにデータ
このチュートリアルでは、次の手順を実行します。
- データ ファクトリを作成します。
- データ フロー アクティビティが含まれるパイプラインを作成します。
- 4 つの変換を使用して、マッピング データ フローを構築します。
- パイプラインをテスト実行します。
- データ フロー アクティビティを監視します。
前提条件
- Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に、free Azure アカウントを作成します。
- Azure ストレージ アカウント。 Data Lake Storageは、source および sink データ ストアとして使用します。 ストレージ アカウントをお持ちでない場合は、「Azure ストレージ アカウントを作成する」を参照してください。 ストレージ アカウントで、選択したネットワークからのアクセスのみが許可されていることを確認します。
このチュートリアルで変換するファイルは moviesDB.csvであり、このGitHubコンテンツ サイトにあります。 GitHubからファイルを取得するには、任意のテキスト エディターに内容をコピーして、.csv ファイルとしてローカルに保存します。 ストレージ アカウントにファイルをアップロードするには、Azure ポータルで BLOB をアップロードするを参照してください。 例では、sample-data という名前のコンテナーを参照します。
Data Factory の作成
この手順では、データ ファクトリを作成し、Data Factory UI を開いて、データ ファクトリにパイプラインを作成します。
Microsoft Edgeまたは Google Chrome を開きます。 現在、Data Factory UI をサポートするのは、Microsoft Edgeブラウザーと Google Chrome Web ブラウザーのみです。
左側のメニューで、 [リソースの作成]>[分析]>[Data Factory] の順に選択します。
[新しいデータ ファクトリ] ページで、 [名前] に「ADFTutorialDataFactory」と入力します。
データ ファクトリの名前は "グローバルに一意" にする必要があります。 データ ファクトリの名前の値に関するエラー メッセージが表示された場合は、別の名前を入力してください (yournameADFTutorialDataFactory など)。 Data Factory アーティファクトの名前付け規則については、Data Factory の名前付け規則に関するページを参照してください。
データ ファクトリを作成する Azure サブスクリプションを選択します。
[リソース グループ] で、次の手順のいずれかを行います。
- [Use existing (既存のものを使用)] を選択し、ドロップダウン リストから既存のリソース グループを選択します。
- [新規作成] を選択し、リソース グループの名前を入力します。
リソース グループの詳細については、「リソース グループを使用してAzure リソースを管理するを参照してください。
[バージョン] で、 [V2] を選択します。
[場所] で、データ ファクトリの場所を選択します。 サポートされている場所のみがドロップダウン リストに表示されます。 データ ファクトリで使用されるデータ ストア (Azure StorageやAzure SQL Databaseなど) とコンピューティング (Azure HDInsightなど) は、他のリージョンに存在できます。
[作成] を選択します
作成が完了すると、その旨が通知センターに表示されます。 [リソースに移動] を選択して、 [Data Factory] ページに移動します。
Open Azure Data Factory Studio を選択して、別のタブで Data Factory UI を起動します。
Data Factory マネージド Virtual NetworkでAzure IR を作成する
この手順では、Azure IR を作成し、Data Factory マネージド Virtual Networkを有効にします。
Data Factory ポータルで、Manage に移動し、New を選択して新しいAzure IR を作成します。
新しいAzure IRの作成を示すスクリーンショット
[Integration runtime setup](統合ランタイムのセットアップ) ページで、必要な機能に基づいて作成する統合ランタイムを選択します。 このチュートリアルでは、Azure、セルフホステッド を選択し、Continue をクリックします。
Azure を選択し、Continue をクリックして、Azure統合ランタイムを作成します。
新しいAzure IRを示すスクリーンショットです。 [仮想ネットワークの構成 (プレビュー)] で、 [有効化] を選択します。
[作成] を選択します
データ フロー アクティビティが含まれるパイプラインの作成
この手順では、データ フロー アクティビティが含まれるパイプラインを作成します。
Azure Data Factoryのホーム ページで、Orchestrate を選択します。
パイプラインの [プロパティ] ウィンドウで、パイプラインの名前として TransformMovies と入力します。
[アクティビティ] ウィンドウで、 [移動と変換] を展開します。 Data Flow アクティビティをウィンドウからパイプライン キャンバスにドラッグします。
追加データフローポップアップで、新しいデータフローを作成を選択し、マッピングデータフローを選択します。 完了したら、 [OK] をクリックします。
マッピング データ フローを示すスクリーンショット [プロパティ] ウィンドウで、データ フローに TransformMovies という名前を付けます。
パイプライン キャンバスの上部バーで、Data Flow debug スライダーをスライドします。 デバッグ モードを使用すると、ライブ Spark クラスターに対する変換ロジックの対話型テストが可能になります。 Data Flow クラスターのウォームアップには 5 ~ 7 分かかります。ユーザーは、Data Flow開発を計画している場合は、最初にデバッグを有効にすることをお勧めします。 詳細については、デバッグ モードに関するページを参照してください。
データ フロー キャンバスでの変換ロジックの作成
データ フローを作成すると、データ フロー キャンバスに自動的に飛ばされます。 この手順では、Data Lake Storageで moviesDB.csv ファイルを取得し、1910 年から 2000 年までのコメディの平均評価を集計するデータ フローを構築します。 その後、このファイルをData Lake Storageに書き戻します。
ソース変換を追加する
この手順では、ソースとしてData Lake Storage Gen2を設定します。
データ フロー キャンバスで [Add Source](ソースの追加) ボックスを選択して、ソースを追加します。
ソースに MoviesDB という名前を付けます。 [新規] を選択して、新しいソース データセットを作成します。
Azure Data Lake Storage Gen2 を選択し、Continue を選択します。
[DelimitedText] を選択してから、 [続行] を選択します。
データセットに MoviesDB という名前を付けます。 リンクされたサービスのドロップダウンで、 [新規] を選択します。
リンクされたサービス作成画面で、Data Lake Storage Gen2リンクされたサービスに ADLSGen2 という名前を付け、認証方法を指定します。 次に、接続の資格情報を入力します。 このチュートリアルでは、アカウント キーを使用してストレージ アカウントに接続しています。
[Interactive authoring](インタラクティブな作成) を必ず有効にしてください。 これは有効になるまでに 1 分程かかる場合があります。
[接続テスト] を選択します。 プライベート エンドポイントを作成および承認しなければストレージ アカウントにアクセスできないようになっているため、これは失敗するはずです。 エラー メッセージ内に、プライベート エンドポイントを作成するためのリンクが表示されます。それをたどることで、マネージド プライベート エンドポイントを作成できます。 代わりに、 [管理] タブに直接移動し、こちらのセクションの指示に従って、マネージド プライベート エンドポイントを作成する方法もあります。
ダイアログ ボックスは開いたままにして、ストレージ アカウントに移動します。
このセクションの手順に従って、プライベート リンクを承認します。
ダイアログ ボックスに戻ります。 もう一度 [接続テスト] を選択し、 [作成] を選択して、リンクされたサービスをデプロイします。
データセットの作成画面で、 [ファイル パス] フィールドの下にファイルが配置されている場所を入力します。 このチュートリアルでは、moviesDB.csv ファイルはコンテナー sample-data に配置されています。 ファイルにはヘッダーが含まれているので、 [先頭の行を見出しとして使用] チェック ボックスをオンにします。 ストレージ内のファイルからヘッダー スキーマを直接インポートするには、 [From connection/store](接続/ストアから) を選択します。 完了したら、 [OK] をクリックします。
デバッグ クラスターが起動している場合は、ソース変換の [データのプレビュー] タブに移動し、 [更新] を選択して、データのスナップショットを取得します。 データ プレビューを使用すると、変換が正しく構成されていることを確認できます。
マネージド プライベート エンドポイントを作成する
前述の接続をテストした際にハイパーリンクを使用しなかった場合は、パスに従います。 次に、作成したリンクされたサービスに接続するマネージド プライベート エンドポイントを作成する必要があります。
[管理] タブに移動します。
注意
Data Factory インスタンスでは、そのすべてで [管理] タブを使用できない場合があります。 表示されない場合は、 [作成者]>[接続]>[プライベート エンドポイント] を選択して、プライベート エンドポイントにアクセスできます。
[マネージド プライベート エンドポイント] セクションに移動します。
[マネージド プライベート エンドポイント] で、 [+ 新規] を選択します。
一覧から Azure Data Lake Storage Gen2 タイルを選択し、Continue を選択します。
作成したストレージ アカウントの名前を入力します。
[作成] を選択します
数秒後に、作成されたプライベート リンクに承認が必要であることが表示されます。
作成したプライベート エンドポイントを選択します。 ストレージ アカウント レベルでプライベート エンドポイントを承認できるハイパーリンクが表示されます。
ストレージ アカウントでプライベート リンクを承認する
ストレージ アカウントで、 [設定] セクションの [プライベート エンドポイント接続] に移動します。
作成したプライベート エンドポイントのチェック ボックスをオンにし、 [承認] を選択します。
説明を追加し、 [はい] を選択します。
Data Factory の [管理] タブにある [マネージド プライベート エンドポイント] セクションに戻ります。
約 1 分後に、プライベート エンドポイントの承認の旨が表示されます。
フィルター変換を追加する
データ フロー キャンバス上のソース ノードの横にあるプラス アイコンを選択して、新しい変換を追加します。 最初に追加する変換は、 [フィルター] です。
フィルター変換に FilterYears という名前を付けます。 [フィルター適用] の横にある式ボックスを選択して、式ビルダーを開きます。 ここでフィルター条件を指定します。
データ フローの式ビルダーでは、さまざまな変換で使用する式を対話形式で作成できます。 式には、組み込み関数、入力スキーマの列、ユーザー定義のパラメーターを含めることができます。 式の作成方法の詳細については、データ フローの式ビルダーに関するページを参照してください。
このチュートリアルでは、1910 年から 2000 年の間に公開された、ジャンルがコメディの映画をフィルター処理します。 現在、年は文字列になっているため、
toInteger()関数を使用して整数に変換する必要があります。 以上演算子 (>=) と以下演算子 (<=) を使って、年のリテラル値 1910 と 2000 に対する比較を行います。 これらの式を and (&&) 演算子を使用して結合します。 式は次のようになります。toInteger(year) >= 1910 && toInteger(year) <= 2000コメディ映画を見つけるには、
rlike()関数を使用して、ジャンル列でパターン 'Comedy' を検索します。rlike式を年の比較と結合すると、次の式が得られます。toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')デバッグ クラスターがアクティブになっている場合は、 [更新] を選択して使用された入力と比較した式の出力を表示して、ロジックを確認できます。 データ フローの式言語を使用してこのロジックを実現する方法に対する正解は複数あります。
式の編集を完了したら、「保存して終了」 を選択してください。
フィルターが正しく機能していることを確認するには、データ プレビューをフェッチします。
集計変換を追加する
次に追加する変換は、 [Schema modifier](スキーマ修飾子) の下にある [集計] 変換です。
集計変換に AggregateComedyRating という名前を付けます。 [グループ化] タブで、ドロップダウン ボックスから [year] を選択し、映画の公開年ごとに集計をグループ化します。
[集計] タブに移動します。左側のテキスト ボックスで、集計列に AverageComedyRating という名前を付けます。 式ビルダーを使用して集計式を入力するには、右側の式ボックスを選択します。
列 [Rating] の平均値を取得するには、
avg()集計関数を使用します。 [Rating] は文字列で、avg()で受け入れられるのは数値入力なので、toInteger()関数を使用して値を数値に変換する必要があります。 表現は次のようになります。avg(toInteger(Rating))完了したら、 [Save and Finish](保存して終了する) を選択します。
変換出力を表示するには、 [Data Preview](データのプレビュー) タブに移動します。 year と AverageComedyRating の 2 つの列だけがあることに注目してください。
シンク変換を追加する
次に、 [Destination](変換先) の下で [シンク] 変換を追加します。
シンクに Sink という名前を付けます。 [新規] を選択して、シンク データセットを作成します。
新しいデータセット ページで、Azure Data Lake Storage Gen2 を選択し、 Continue を選択します。
[形式の選択] ページで、 [DelimitedText] を選択してから、 [続行] を選択します。
シンク データセットに MoviesSink という名前を付けます。 リンクされたサービスでは、ソース変換用に作成したものと同じ ADLSGen2 のリンクされたサービスを選択します。 データの書き込み先となる出力フォルダーを入力します。 このチュートリアルでは、コンテナー sample-data 内のフォルダー output に書き込んでいます。 フォルダーは、事前に存在している必要はなく、動的に作成することができます。 [先頭の行を見出しとして使用] チェック ボックスを選択し、 [スキーマのインポート] で [なし] を選択します。 [OK] を選択します。
これで、データ フローの構築が完了しました。 これをパイプラインで実行する準備ができました。
データ フローの実行と監視
パイプラインを発行する前にデバッグすることができます。 この手順では、データ フロー パイプラインのデバッグ実行をトリガーします。 データのプレビューではデータが書き込まれませんが、デバッグ実行ではシンクの変換先にデータが書き込まれます。
パイプライン キャンバスへ移動してください。 [デバッグ] を選択して、デバッグ実行をトリガーします。
データ フロー アクティビティのパイプライン デバッグでは、アクティブなデバッグ クラスターが使用されますが、それでも初期化には少なくとも 1 分かかります。 進行状況は [出力] タブで追跡することができます。実行が正常に完了したら、眼鏡のアイコンを選択して実行の詳細を確認します。
詳細ページには、各変換手順で使用した行の数と所要時間が表示されます。
変換を選択すると、データの列とパーティション分割に関する詳細情報が表示されます。
このチュートリアルに正しく従った場合は、シンク フォルダーに 83 行と 2 列が書き込まれているはずです。 BLOB ストレージをチェックすることで、データが正しいことを確認できます。
まとめ
このチュートリアルでは、Data Factory UI を使用して、Data Factory Managed Virtual Network のマッピング データ フローを使用して、Data Lake Storage Gen2 ソースからData Lake Storage Gen2 シンクにデータをコピーして変換するパイプラインを作成しました (両方とも、選択したネットワークへのアクセスのみを許可します)。