次の方法で共有


実稼働ワークロード用に自動ローダーを構成する

Databricks では、増分データ インジェストに Lakeflow Spark 宣言パイプラインで自動ローダーを使用することをお勧めします。 Lakeflow Spark 宣言型パイプラインは、Apache Spark Structured Streaming の機能を拡張し、数行の宣言型Pythonまたは SQL を記述して、運用品質のデータ パイプラインをデプロイできます。

また、Databricks では、運用環境で自動ローダーを実行するためのストリーミングのベスト プラクティスに従うことをお勧めします。 「構造化ストリーミングの運用に関する考慮事項」を参照してください。

自動ローダーの監視

次のセクションでは、メトリック、ログ、アラート、一般的なトラブルシューティング ワークフローなど、運用環境で自動ローダーを監視する方法について説明します。

自動ローダーによって検出されたファイルのクエリ

自動ローダーは、ストリームの状態を検査するための SQL API を提供します。 cloud_files_state 関数を使用して、自動ローダー ストリームによって検出されたファイルに関するメタデータを見つけることができます。 クエリ cloud_files_state。自動ローダー ストリームに関連付けられているチェックポイントの場所を指定します。

cloud_files_state 関数は、Databricks Runtime 11.3 LTS 以降で使用できます。

SELECT * FROM cloud_files_state('path/to/checkpoint');

ストリームの更新を監視する

自動ローダー ストリームをさらに監視するために、Databricks では Apache Spark のストリーミング クエリ リスナー インターフェイスを使用することをお勧めします。 Azure Databricks の Monitoring Structured Streaming クエリを参照してください。

バッチごとに自動ローダーからストリーミング クエリ リスナーにメトリックが報告されます。 バックログに存在するファイルの数と、バックログのサイズを、ストリーミング クエリの進行状況ダッシュボードにある numFilesOutstanding タブの numBytesOutstanding メトリックと メトリックで表示できます。

{
  "sources": [
    {
      "description": "CloudFilesSource[/path/to/source]",
      "metrics": {
        "numFilesOutstanding": "238",
        "numBytesOutstanding": "163939124006"
      }
    }
  ]
}

Databricks Runtime 10.4 LTS 以降でファイル通知モードを使用する場合、メトリックには、AWS および Azure の approximateQueueSize としてクラウド キュー内のファイル イベントの概数も含まれます。

コストに関する考慮事項

自動ローダーを実行する場合、コストの主なソースはコンピューティング リソースとファイル検出です。

コンピューティング コストを削減するために、Databricks では、待機時間の要件が低くない限り、継続的に実行するのではなく、 Trigger.AvailableNow を使用して自動ローダーをバッチ ジョブとしてスケジュールする Lakeflow ジョブを使用することをお勧めします。 「構造化ストリーミングのトリガー間隔を構成する」を参照してください。 これらのバッチ ジョブは、 ファイル到着トリガー を使用してトリガーして、ファイル到着と処理の間の待機時間をさらに短縮できます。

ファイル検出のコストは、ディレクトリ 一覧表示モードのストレージ アカウントに対する LIST 操作と、サブスクリプション サービスの API 要求、およびファイル通知モードのキュー サービスの形式で発生する可能性があります。 ファイル検出コストを削減するために、Databricks では次のように推奨しています。

ソース データのリテンション期間

Databricks Runtime 16.4 LTS 以降で使用できます。

ソース ディレクトリにファイルが蓄積されると、ストレージ コストが増加し、特にディレクトリ一覧モードでファイルの検出が遅くなります。 自動ローダーには、処理後にファイルをアーカイブまたは削除することで、ファイルの保持期間を自動的に管理する cloudFiles.cleanSource オプションが用意されています。

ソース ディレクトリ内のファイルをアーカイブしてコストを削減する

警告

  • cloudFiles.cleanSource設定すると、ソース ディレクトリ内のファイルが削除または移動されます。
  • データ処理に foreachBatch を使用すると、操作でバッチ内のファイルのサブセットのみが使用された場合でも、 foreachBatch 操作が正常に戻るとすぐに、ファイルは移動または削除候補になります。

Databricks では、検出コストを削減するために、ファイル イベントで自動ローダーを使用することをお勧めします。 これにより、検出が増分されるため、コンピューティング コストも削減されます。

ファイル イベントを使用できず、ディレクトリ一覧を使用してファイルを検出する必要がある場合は、 cloudFiles.cleanSource オプションを使用して、自動ローダーがファイルを処理した後に自動的にアーカイブまたは削除して、検出コストを削減できます。 自動ローダーは、処理後にソース ディレクトリからファイルをクリーンアップするため、検出時に一覧表示する必要があるファイルが少なくなります。

cloudFiles.cleanSource オプションでMOVEを使用する場合は、次の要件を考慮してください。

  • ソース ディレクトリと移動先の移動ディレクトリの両方が、同じバケットまたはコンテナーに配置されている必要があります。 クロスバケットとクロスコンテナーの移動はサポートされていないため、エラーが発生します。
  • 移動先には、ボリューム パス ( /Volumes/my_catalog/my_schema/my_volume/archive/など) を指定できます。
  • ソース ディレクトリと移行先ディレクトリが同じ外部の場所にある場合は、マネージド ストレージ (マネージド ボリュームやカタログなど) を含む兄弟ディレクトリを含めないようにする必要があります。 このような場合、自動ローダーは宛先ディレクトリに書き込むのに必要なアクセス許可を取得できません。

Databricks では、次の場合にこのオプションを使用することをお勧めします。

  • ソース ディレクトリには、時間の経過と同時に大量のファイルが蓄積されます。
  • コンプライアンスまたは監査のために処理されたファイルを保持する必要があります ( cloudFiles.cleanSourceMOVEに設定します)。
  • インジェスト後にファイルを削除してストレージ コストを削減する ( cloudFiles.cleanSourceDELETE に設定する)。 DELETE モードを使用する場合、Databricks では、自動ローダーの削除が論理的な削除として機能し、構成ミスが発生した場合に使用できるように、バケットでバージョン管理を有効にすることをお勧めします。 さらに、Databricks では、復旧要件に基づいて、指定された猶予期間 (60 日や 90 日など) 後に古い論理的に削除されたバージョンを消去するようにクラウド ライフサイクル ポリシーを設定することをお勧めします。

cleanSourceオプションとその既定値の完全なリファレンスについては、cloudFiles.cleanSource を参照してください。

処理されたファイルをコールド ストレージ パスに移動する

次の例では、処理されたファイルを 14 日後に同じバケット内のアーカイブ ディレクトリに移動するように自動ローダーを構成します。 アーカイブ パスにクラウド ライフサイクル ポリシーを適用して、ファイルをより安価なストレージ層 (AWS S3 Glacier、Azureクール/アーカイブ、GCS コールドライン/アーカイブなど) に移行できます。

Python

# Step 1: Configure Auto Loader to move processed files to an archive path.
checkpoint = "/Volumes/my_catalog/my_schema/my_volume/checkpoints/ingest_stream"
archive_path = "s3://my-bucket/archive/landing/"

df = (spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.cleanSource", "MOVE")
  .option("cloudFiles.cleanSource.moveDestination", archive_path)
  .option("cloudFiles.cleanSource.retentionDuration", "14 days")
  .option("cloudFiles.schemaLocation", checkpoint)
  .load("s3://my-bucket/landing/")
)

# Step 2: Write to a Delta table.
(df.writeStream
  .option("checkpointLocation", checkpoint)
  .trigger(availableNow=True)
  .toTable("my_catalog.my_schema.raw_events")
)

# Step 3 (outside Databricks): Set up a cloud lifecycle policy on the
# archive path to transition files to cold storage after a grace period.
# For example, in AWS you can configure an S3 Lifecycle rule to move
# objects under s3://my-bucket/archive/landing/ to S3 Glacier after
# 30 days.

SQL

-- Step 1: Configure Auto Loader to move processed files to an archive path
-- using a Lakeflow Declarative Pipeline.
CREATE OR REFRESH STREAMING TABLE raw_events
AS SELECT * FROM STREAM read_files(
  's3://my-bucket/landing/',
  format => 'json',
  cleanSource => 'MOVE',
  cleanSourceMoveDestination => 's3://my-bucket/archive/landing/',
  cleanSourceRetentionDuration => '14 days'
);

-- Step 2 (outside Databricks): Set up a cloud lifecycle policy on the
-- archive path to transition files to cold storage.
-- For example, in AWS configure an S3 Lifecycle rule to move objects
-- under s3://my-bucket/archive/landing/ to S3 Glacier after 30 days.

Trigger.AvailableNow とレート制限の使用

Databricks Runtime 10.4 LTS 以降で使用できます。

自動ローダーは、 Trigger.AvailableNowを使用して、Lakeflow ジョブでバッチ ジョブとして実行するようにスケジュールできます。 AvailableNow トリガーは、クエリの開始時刻より前に到着したすべてのファイルを処理するように自動ローダーに指示します。 ストリームの開始後に到着した新しいファイルは、次のトリガーまで無視されます。

Trigger.AvailableNow を使用すると、ファイルの検出はデータ処理と非同期に行われ、データはレート制限付きの複数のマイクロバッチ間で処理できます。 自動ローダーでは、既定で、マイクロバッチごとに最大 1,000 個のファイルが処理されます。 cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger を構成して、マイクロバッチで処理する必要があるファイル数またはバイト数を構成できます。 ファイル制限はハード制限ですが、バイト制限はソフト制限です。つまり、指定された maxBytesPerTrigger より多くのバイトが処理されることがあります。 両方のオプションが一緒に指定されている場合、自動ローダーでは、いずれかの制限に達するために必要な数のファイルを処理します。

チェックポイントの場所

チェックポイントの場所は、ストリームの状態と進行状況の情報を格納するために使用されます。 Databricks では、チェックポイントの場所をクラウド オブジェクト ライフサイクル ポリシーのない場所に設定することをお勧めします。 チェックポイントの場所にあるファイルがポリシーに従ってクリーンアップされると、ストリームの状態が破損します。 この場合は、ストリームを最初から再起動する必要があります。

ファイル イベントの追跡

自動ローダーは、RocksDB を使用してチェックポイントの場所で検出されたファイルを追跡し、厳密に 1 回のインジェスト保証を提供します。 大量または有効期間の長いインジェスト ストリームの場合、Databricks では Databricks Runtime 15.4 LTS 以降へのアップグレードをお勧めします。 これらのバージョンでは、自動ローダーはストリームが開始される前に RocksDB 状態全体がダウンロードされるのを待つことはありません。これにより、ストリームの起動時間が短縮される可能性があります。 ファイルの状態が制限なく拡大しないようにする場合は、 cloudFiles.maxFileAge オプションを使用して、特定の年齢より古いファイル イベントを期限切れにすることを検討することもできます。 cloudFiles.maxFileAge に設定できる最小値は "14 days" です。 RocksDBの削除は、墓石エントリとして表されます。 そのため、イベントの有効期限が切れてから平準化が開始される前に、ストレージの使用量が一時的に増加する場合があります。

警告

cloudFiles.maxFileAge は大量のデータセットのためのコスト制御メカニズムとして提供されます。 cloudFiles.maxFileAge のチューニングがアグレッシブすぎると、重複取り込みやファイル欠如など、データ品質の問題を引き起こすことがあります。 そのため、Databricks は cloudFiles.maxFileAge に 90 日間などの控えめな設定を推奨しています。同等のデータ インジェスト ソリューションもこのくらいを推奨しています。

cloudFiles.maxFileAge オプションをチューニングしようとすると、処理されていないファイルが自動ローダーによって無視されたり、既に処理されたファイルの有効期限が切れて再処理され、データが重複する可能性があります。 次に、cloudFiles.maxFileAge を選択する際の考慮事項をいくつか示します。

  • 長い時間が経過した後にストリームが再起動された場合、キューからプルされたファイル通知イベントのうち、cloudFiles.maxFileAge より前のものは無視されます。 同様に、ディレクトリ一覧を使用する場合、ダウンタイム中に表示された可能性がある、cloudFiles.maxFileAge より前のファイルは無視されます。
  • ディレクトリ一覧モードを使用し、cloudFiles.maxFileAge を使用する場合は (例: "1 month" に設定)、ストリームを停止し、cloudFiles.maxFileAge"2 months" に設定してストリームを再起動すると、1 か月より古く、2 か月より新しいファイルは再処理されます。

ストリームを初めて開始するときにこのオプションを設定する場合は、cloudFiles.maxFileAge より前のデータは取り込まれないため、前のデータを取り込む場合、ストリームを初めて開始するときにこのオプションを設定しないでください。 ただし、後続の実行では、このオプションを設定してください。

cloudFiles.backfillInterval を使用して通常のバックフィルをトリガーする

まれに、通知メッセージの保持制限に達したときなど、通知システムのみに依存する場合、ファイルが見落とされたり遅れたりすることがあります。 データの完全性と SLA に関する厳密な要件がある場合は、指定した間隔で非同期バックフィルをトリガーするように cloudFiles.backfillInterval を設定することを検討してください。 たとえば、毎日のバックフィルの場合は 1 日、毎週のバックフィルの場合は 1 週間に設定します。 定期的なバックフィルをトリガーしても、重複は発生しません。

ファイル イベントを使用する場合は、少なくとも 7 日に 1 回はストリームを実行します

ファイル イベントを使用する場合は、ディレクトリの完全な一覧を回避するために、少なくとも 7 日に 1 回、自動ローダー ストリームを実行します。 自動ローダーのストリームを頻繁に実行することで、ファイルの検出が段階的に行われることが確実になります。

包括的なマネージド ファイル イベントのベスト プラクティスについては、「 ファイル イベントを使用した自動ローダーのベスト プラクティス」を参照してください。