次の方法で共有


SharePointからファイルを取り込む

Important

この機能は ベータ版です。 ワークスペース管理者は、[ プレビュー] ページからこの機能へのアクセスを制御できます。 Manage Azure Databricks プレビューを参照してください。

:::note コンプライアンス

SharePoint コネクタは、セキュリティとコンプライアンスの強化設定を構成が有効になっているワークスペースでの使用をサポートします。

:::

構造化ファイル、半構造化ファイル、非構造化ファイルを Microsoft SharePoint から Delta テーブルに取り込むことができます。 SharePoint コネクタでは、バッチ API とストリーミング API (自動ローダー、spark.readCOPY INTOなど) を使用したSharePoint ファイルの増分インジェストがサポートされます。すべて Unity カタログ ガバナンスを使用します。

SharePoint コネクタを選択します

Lakeflow Connect には、2 つの補完的なSharePoint コネクタが用意されています。 どちらもSharePoint内のデータにアクセスしますが、個別の目標をサポートします。

考慮事項 マネージド SharePoint コネクタ Standard SharePoint コネクタ
管理とカスタマイズ フル マネージド コネクタ。
Delta テーブルにデータを取り込み、ソースとの同期を維持するエンタープライズ アプリケーション向けのシンプルでメンテナンスの少ないコネクタ。 Lakeflow Connect のマネージド コネクタを参照してください。
バッチ API とストリーミング API ( read_filesspark.readCOPY INTO、自動ローダーなど) を使用して、SQL、PySpark、または Lakeflow Spark 宣言パイプラインを使用してカスタム インジェスト パイプラインを構築します。
インジェスト中に複雑な変換を柔軟に実行できる一方で、パイプラインの管理と保守に対する責任が高くなります。
出力形式 均一バイナリ コンテンツ テーブル。 各ファイルをバイナリ形式 (1 行に 1 つのファイル) でインジェストし、ファイルメタデータも含めて、
追加の列。
構造化デルタ テーブル。 構造化ファイル (CSV やExcelなど) を Delta テーブルとして取り込みます。 取り込みにも使用できます
バイナリ形式の非構造化ファイル。
粒度、フィルター処理、および選択 現在、サブフォルダーまたはファイル レベルは選択されていません。 パターン ベースのフィルター処理はありません。
指定したSharePoint ドキュメント ライブラリ内のすべてのファイルを取り込みます。
きめ細かくカスタム。
ドキュメント ライブラリ、サブフォルダー、または個々のファイルから取り込む URL ベースの選択。 また、 pathGlobFilter オプションを使用したパターン ベースのフィルター処理もサポートしています。

主な機能

標準SharePoint コネクタには次のものが用意されています。

  • 構造化ファイル、半構造化ファイル、および非構造化ファイルの取り込み
  • 詳細インジェスト: 特定のサイト、サブサイト、ドキュメントライブラリ、フォルダー、または1つのファイルをインジェストする
  • バッチおよびストリーミング取り込みをspark.read、オートローダー、そしてCOPY INTOを使用して行う。
  • CSV やExcelなどの構造化形式と半構造化形式の自動スキーマ推論と進化
  • Unity カタログ接続を使用して資格情報ストレージをセキュリティで保護する
  • パターン マッチングを使用したファイルの選択 pathGlobFilter

Requirements

SharePointからファイルを取り込むには、次のものが必要です。

  • Unity Catalog が有効になっているワークスペース。
  • SharePoint接続を作成するための CREATE CONNECTION 特権、または クラスター アクセス モードに基づいて既存のものを使用するための適切な特権:
    • 専用アクセス モード: MANAGE CONNECTION
    • 標準アクセス モード: USE CONNECTION
  • Databricks Runtime バージョン 17.3 LTS 以降を使用するコンピューティング。
  • Sites.Read.AllまたはSites.Selectedアクセス許可スコープを使用して設定された OAuth 認証。
  • SharePointベータ機能は、Previews ページから有効になります。 Manage Azure Databricks プレビューを参照してください。
  • 省略可能: Excel ファイルを解析するために、Excel ベータ機能を有効にします。 「read Excel files を参照してください。

接続を作成する

SharePoint資格情報を格納する Unity カタログ接続を作成します。 接続のセットアップ プロセスは、標準コネクタとマネージド SharePoint コネクタの両方で共有されます。

OAuth 認証オプションを含む完全な接続セットアップ手順については、「 SharePoint インジェストセットアップの概要を参照してください。

SharePoint からファイルを読み取る

Spark を使用してSharePointからファイルを読み取る場合は、前の手順で作成した接続を databricks.connection データ ソース オプションを使用して指定し、アクセスするSharePoint リソースの URL を指定します。 この URL は、特定のファイル、フォルダー、ドキュメント ライブラリ (ドライブ)、またはサイト全体を参照できます。 たとえば、次のようになります。

  • https://mytenant.sharepoint.com/sites/test-site/
  • https://mytenant.sharepoint.com/sites/test-site/test-subsite
  • https://mytenant.sharepoint.com/sites/test-site/test-drive
  • https://mytenant.sharepoint.com/sites/test-site/Shared%20Documents/Forms/AllItems.aspx
  • https://mytenant.sharepoint.com/sites/test-site/test-drive/test-folder
  • https://mytenant.sharepoint.com/sites/test-site/test-drive/test-folder/test.csv
  • https://mytenant.sharepoint.com/sites/test-site/another-subsite/another-drive/test.csv

例示

標準のSharePoint コネクタを使用してファイルを読み取る方法はいくつかあります。

Stream SharePoint ファイルを自動ローダーでストリームする

自動ローダーは、SharePointから構造化ファイルを増分的に取り込むための最も効率的な方法を提供します。 新しいファイルが自動的に検出され、到着すると処理されます。 また、自動スキーマ推論と進化を使用して、CSV や JSON などの構造化された半構造化ファイルを取り込むこともできます。 自動ローダーの使用方法の詳細については、「 一般的なデータ読み込みパターン」を参照してください。

# Incrementally ingest new PDF files
df = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("cloudFiles.schemaLocation", <path to a schema location>)
    .option("pathGlobFilter", "*.pdf")
    .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
)

# Incrementally ingest CSV files with automatic schema inference and evolution
df = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("pathGlobFilter", "*.csv")
    .option("inferColumnTypes", True)
    .option("header", True)
    .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
)

Spark バッチ読み取りを使用したSharePoint ファイルの読み取り

次の例では、spark.read 関数を使用して、Python内のSharePoint ファイルを取り込む方法を示します。

# Read unstructured data as binary files
df = (spark.read
        .format("binaryFile")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("recursiveFileLookup", True)
        .option("pathGlobFilter", "*.pdf") # optional. Example: only ingest PDFs
        .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"))

# Read a batch of CSV files, infer the schema, and load the data into a DataFrame
df = (spark.read
        .format("csv")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("pathGlobFilter", "*.csv")
        .option("recursiveFileLookup", True)
        .option("inferSchema", True)
        .option("header", True)
        .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"))

# Read a specific Excel file from SharePoint, infer the schema, and load the data into a DataFrame
df = (spark.read
        .format("excel")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("headerRows", 1)                   # optional
        .option("dataAddress", "Sheet1!A1:M20")  # optional
        .load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"))

Spark SQL を使用したSharePoint ファイルの読み取り

次の例は、read_files テーブル値関数を使用して、SQL SharePoint ファイルを取り込む方法を示しています。 read_filesの使用方法の詳細については、テーブル値関数read_files参照してください。

-- Read pdf files
CREATE TABLE my_table AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  `databricks.connection` => "my_sharepoint_conn",
  format => "binaryFile",
  pathGlobFilter => "*.pdf", -- optional. Example: only ingest PDFs
  schemaEvolutionMode => "none"
);

-- Read a specific Excel sheet and range
CREATE TABLE my_sheet_table AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
  `databricks.connection` => "my_sharepoint_conn",
  format => "excel",
  headerRows => 1,  -- optional
  dataAddress => "Sheet1!A2:D10", -- optional
  schemaEvolutionMode => "none"
);

増分インジェストCOPY INTO

COPY INTO は、Delta テーブルへのファイルのべき等増分読み込みを提供します。 COPY INTOの使用方法の詳細については、「COPY INTOを使用した一般的なデータ読み込みパターン」を参照してください。

CREATE TABLE IF NOT EXISTS sharepoint_pdf_table;
CREATE TABLE IF NOT EXISTS sharepoint_csv_table;
CREATE TABLE IF NOT EXISTS sharepoint_excel_table;

# Incrementally ingest new PDF files
COPY INTO sharepoint_pdf_table
  FROM "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"
  FILEFORMAT = BINARYFILE
  PATTERN = '*.pdf'
  FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn')
  COPY_OPTIONS ('mergeSchema' = 'true');

# Incrementally ingest CSV files with automatic schema inference and evolution
COPY INTO sharepoint_csv_table
  FROM "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"
  FILEFORMAT = CSV
  PATTERN = '*.csv'
  FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn', 'header' = 'true', 'inferSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

# Ingest a single Excel file
COPY INTO sharepoint_excel_table
  FROM "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"
  FILEFORMAT = EXCEL
  FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn', 'headerRows' = '1')
  COPY_OPTIONS ('mergeSchema' = 'true');

Lakeflow Spark 宣言型パイプラインで SharePoint ファイルを使用する

SharePoint コネクタには、Databricks Runtime 17.3 以降が必要です。 コネクタを使用するには、パイプライン設定で "CHANNEL" = "PREVIEW" を設定します。 プレビューの詳細については、「 パイプラインプロパティリファレンス」を参照してください

次の例では、Lakeflow Spark 宣言パイプラインで自動ローダーを使用してSharePointファイルを読み取る方法を示します。

Python

from pyspark import pipelines as dp

# Incrementally ingest new PDF files
@dp.table
def sharepoint_pdf_table():
  return (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("pathGlobFilter", "*.pdf")
    .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
  )

# Incrementally ingest CSV files with automatic schema inference and evolution
@dp.table
def sharepoint_csv_table():
  return (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("databricks.connection", "my_sharepoint_conn")
      .option("pathGlobFilter", "*.csv")
      .option("inferColumnTypes", True)
      .option("header", True)
      .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
  )

# Read a specific Excel file from SharePoint in a materialized view
@dp.table
def sharepoint_excel_table():
  return (spark.read.format("excel")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("headerRows", 1)                   # optional
    .option("inferColumnTypes", True)            # optional
    .option("dataAddress", "Sheet1!A1:M20")  # optional
    .load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx")

SQL

-- Incrementally ingest new PDF files
CREATE OR REFRESH STREAMING TABLE sharepoint_pdf_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  format => "binaryFile",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.pdf");

-- Incrementally ingest CSV files with automatic schema inference and evolution
CREATE OR REFRESH STREAMING TABLE sharepoint_csv_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs",
  format => "csv",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.csv",
  "header", "true");

-- Read a specific Excel file from SharePoint in a materialized view
CREATE OR REFRESH MATERIALIZED VIEW sharepoint_excel_table
AS SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
  `databricks.connection` => "my_sharepoint_conn",
  format => "excel",
  headerRows => 1,  -- optional
  dataAddress => "Sheet1!A2:D10", -- optional
  `cloudFiles.schemaEvolutionMode` => "none"
);

非構造化ファイルを解析する

標準の SharePoint コネクタと binaryFile 形式を使用して、SharePoint (PDF、Word ドキュメント、PowerPoint ファイルなど) から非構造化ファイルを取り込む場合、ファイルの内容は生のバイナリ データとして格納されます。 RAG、検索、分類、ドキュメントの理解などの AI ワークロード用にこれらのファイルを準備するには、 ai_parse_documentを使用して、バイナリ コンテンツを構造化されたクエリ可能な出力に解析できます。

次の例は、 documents という名前のブロンズ Delta テーブルに格納されている非構造化ドキュメントを解析し、解析されたコンテンツを含む新しい列を追加する方法を示しています。

CREATE TABLE documents AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  `databricks.connection` => "my_sharepoint_conn",
  format => "binaryFile",
  pathGlobFilter => "*.{pdf,docx}",
  schemaEvolutionMode => "none"
);
SELECT *, ai_parse_document(content) AS parsed_content
FROM documents;

parsed_content列には、抽出されたテキスト、テーブル、レイアウト情報、およびダウンストリーム AI パイプラインに直接使用できるメタデータが含まれています。

Lakeflow Spark 宣言パイプラインを使用した増分解析

また、Lakeflow Spark 宣言パイプライン内の ai_parse_document を使用して、増分解析を有効にすることもできます。 新しいファイルがSharePointから流れ込むと、パイプラインの更新時に自動的に解析されます。

たとえば、新しく取り込まれたドキュメントを継続的に解析する具体化されたビューを定義できます。

CREATE OR REFRESH STREAMING TABLE sharepoint_documents_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  format => "binaryFile",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.{pdf,docx}");

CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT *, ai_parse_document(content) AS parsed_content
FROM sharepoint_documents_table;

この方法により、次のことが保証されます。

  • 新しく取り込まれたSharePoint ファイルは、具体化されたビューが更新されるたびに自動的に解析されます
  • 解析された出力と受信データの同期が維持される
  • ダウンストリーム AI パイプラインは、常に最新のドキュメントの表現で動作します。

詳細情報: サポートされている形式と詳細オプションについては、 ai_parse_document を参照してください。

制限事項

標準SharePoint コネクタには、次の制限があります。

  • マルチサイト インジェストなし: 同じクエリを使用して複数のサイトを取り込むことはできません。 2 つのサイトから取り込むには、2 つの個別のクエリを記述する必要があります。
  • フィルター処理: pathGlobFilter オプションを使用して、名前でファイルをフィルター処理できます。 フォルダー パスベースのフィルター処理はサポートされていません。
  • サポートされていない形式: SharePoint リストと .aspx サイトページはサポートされていません。 ドキュメント ライブラリ内のファイルのみがサポートされています。
  • SharePoint サーバーへの書き戻しはサポートされていません。
  • 自動ローダー cleanSource (インジェスト後のソースでのファイルの削除またはアーカイブ) はサポートされていません。

次のステップ