次の方法で共有


Azure Databricksを使用した変換

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。

このチュートリアルでは、Azure Data Factoryの ValidationCopy data、および Notebook アクティビティを含むエンド ツー エンドのパイプラインを作成します。

  • 検証は、コピーおよび分析ジョブをトリガーする前に、ソース データセットがダウンストリームで使用できる状態にします。

  • Copy data は、ソース データセットをシンク ストレージに複製し、Azure Databricks ノートブックに DBFS としてマウントします。 このようにして、データセットを Spark で直接使用することができます。

  • ノートブックによって、データセットを変換する Databricks Notebook がトリガーされます。 また、処理されたフォルダーまたはAzure Synapse Analyticsにデータセットが追加されます。

わかりやすくするために、このチュートリアルのテンプレートではスケジュールされたトリガーを作成しません。 必要に応じて、1 つを追加できます。

パイプラインの図

前提条件

  • シンクとして使用する sinkdata というコンテナーを持つAzure BLOB ストレージ アカウント。

    ストレージ アカウント名、コンテナー名、アクセス キーをメモしておきます。 これらの値は、後でテンプレートの中で必要になります。

  • Azure Databricks ワークスペース。

変換するノートブックをインポートする

変換ノートブックを Databricks ワークスペースにインポートするには、次のようにします。

  1. Azure Databricks ワークスペースにサインインします。

  2. ワークスペース内のフォルダーを右クリックし、[インポート] を選択します。

  3. インポート元:URL を選択します。 テキスト ボックスに、「https://adflabstaging1.blob.core.windows.net/share/Transformations.html」と入力します。

    ノートブックをインポートするための選択

  4. それでは、[変換]ノートブックをあなたのストレージ接続情報で更新しましょう。

    インポートしたノートブックで、次のコード スニペットに示すようにコマンド 5 にアクセスします。

    • <storage name><access key> を実際のストレージ接続情報に置き換えます。
    • sinkdata コンテナーでストレージ アカウントを使用します。
    # Supply storageName and accessKey values  
    storageName = "<storage name>"  
    accessKey = "<access key>"  
    
    try:  
      dbutils.fs.mount(  
        source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",  
        mount_point = "/mnt/Data Factorydata",  
        extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey})  
    
    except Exception as e:  
      # The error message has a long stack track. This code tries to print just the relevant line indicating what failed.
    
    import re
    result = re.findall(r"\^\s\*Caused by:\s*\S+:\s\*(.*)\$", e.message, flags=re.MULTILINE)
    if result:
      print result[-1] \# Print only the relevant error message
    else:  
      print e \# Otherwise print the whole stack trace.  
    
  5. Data Factory で Databricks にアクセスするための Databricks アクセス トークンを生成します。

    1. Azure Databricks ワークスペースで、上部のバーでAzure Databricksユーザー名を選択し、ドロップダウンから [設定] を選択します。
    2. [開発者モード] を選択します。
    3. 次に、[アクセス トークン] の横にある [管理] を選択します。
    4. [新しいトークンの生成] を選択します。
    5. (省略可能) 将来このトークンを識別するのに役立つコメントを入力し、トークンの既定の有効期間 90 日を変更します。 有効期間のないトークンを作成するには (推奨されません)、[有効期間 (日)] ボックスを空のままにします。
    6. 生成を選択します。
    7. 表示されたトークンを安全な場所にコピーし、[完了] を選択します。

Databricks のリンクされたサービスの作成に後で使用するためにアクセス トークンを保存します。 アクセス トークンは、dapi32db32cbb4w6eee18b7d87e45exxxxxx のようになります。

このテンプレートの使用方法

  1. Transformation with Azure Databricks テンプレートに移動し、次の接続用の新しいリンクされたサービスを作成します。

    接続の設定

    • ソース BLOB 接続 - ソース データにアクセスするため。

      この演習では、ソース ファイルが格納されているパブリック BLOB ストレージを使用できます。 構成については、次のスクリーンショットを参照してください。 次の [SAS URL] を使用して、ソース ストレージに接続します (読み取り専用アクセス)。

      https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D

      認証方法と SAS URL の選択

    • コピー先 Blob 接続 - コピーしたデータを保存するためのもの。

      [New Linked Service](新しいリンクされたサービス) ウィンドウで、シンク ストレージ BLOB を選択します。

      新しいリンクされたサービスとしてのシンク ストレージ BLOB

    • Azure Databricks - Databricks クラスターに接続します。

      前に生成したアクセス キーを使用して、Databricks にリンクされたサービスを作成します。 ある場合は、対話型クラスターを選択することもできます。 この例では、 [New job cluster](新しいジョブ クラスター) オプションを使用します。

      クラスターに接続するための選択

  2. [このテンプレートを使用] を選択します。 作成されたパイプラインが表示されます。

    パイプラインを作成する

パイプラインの概要と構成

新しいパイプラインのほとんどの設定は、既定値で自動的に構成されています。 パイプラインの構成を確認し、必要に応じて変更します。

  1. [検証] アクティビティの [Availability flag](可用性フラグ) で、ソース データセットの値が、先ほど作成した SourceAvailabilityDataset に設定されていることを確認します。

    ソース データセットの値

  2. データ コピー アクティビティの [file-to-blob] で、 [ソース] タブと [シンク] タブを確認します。 必要に応じて設定を変更します。

    • [ソース] タブ [ソース] タブ

    • [シンク] タブ [シンク] タブ

  3. ノートブック アクティビティの変換で、必要に応じてパスと設定を確認し、更新します。

    [Databricks リンク サービス] には、次に示すように、前の手順の値が事前に設定されます。Databricks リンク サービスに設定されている値

    ノートブック設定を確認するには、次のようにします。

    1. [設定] タブを選択します。ノートブック パスについては、既定のパスが正しいことを確認します。 正しいノートブック パスを参照して、選択する必要がある場合があります。

      ノートブック パス

    2. [基本パラメーター] セレクターを展開し、パラメーターが次のスクリーンショットに示されている内容と一致していることを確認します。 これらのパラメーターは Data Factory から Databricks Notebook に渡されます。

      基本パラメーター

  4. パイプラインのパラメーターが、次のスクリーンショットに示されている内容と一致していることを確認します。パイプラインのパラメーター

  5. データセットに接続します。

    注記

    以下のデータ セットでは、ファイルのパスはテンプレートで自動的に指定されています。 変更が必要な場合は、接続エラーが発生した場合に備えて、containerdirectory の両方のパスを指定してください。

    • SourceAvailabilityDataset - ソース データが使用可能であることを確認します。

      SourceAvailabilityDataset のリンクされたサービスとファイル パスの選択

    • SourceFilesDataset - ソース データにアクセスします。

      SourceFilesDataset のリンクされたサービスとファイル パスの選択

    • DestinationFilesDataset - シンクのターゲットの場所にデータをコピーします。 次の値を使用します。

      • リンクされたサービス - sinkBlob_LS前のステップで作成済み。

      • ファイル パス - sinkdata/staged_sink

        DestinationFilesDataset のリンクされたサービスとファイル パスの選択

  6. [デバッグ] を選択してパイプラインを実行します。 より詳細な Spark のログについては、Databricks のログへのリンクを検索できます。

    出力から Databricks ログへのリンク

    Azure Storage Explorerを使用してデータ ファイルを確認することもできます。

    注記

    Data Factory のパイプラインの実行と関連付けるため、この例では Data Factory からのパイプライン実行 ID を出力フォルダーに追加しています。 これにより、実行ごとに生成されたファイルを追跡することができます。 追加されたパイプラインラン ID