この記事では、ログ インジェスト API を使用し、Azure Log Analytics の宛先としての Fabric Apache Spark 診断エミッターについて説明します。
Fabric Apache Spark 診断エミッターは、宛先間で Spark 診断用の共通構成モデルを提供します。 Azure Log Analyticsの場合、Log Ingestion API が推奨されるインジェスト モデルです。
この記事では、エミッタ プロパティを構成し、Apache Spark ログ、イベント ログ、メトリックをLog Analyticsにルーティングし、取り込まれたデータにクエリを実行して監視とトラブルシューティングを行う方法について説明します。
Fabric Apache Spark 診断エミッターでのアーキテクチャと宛先の選択については、「Fabric Apache Spark Diagnostic Emitter の概要を参照してください。
データ コレクター API からの移行
HTTP データ コレクター API は非推奨です。 Log Ingestion API に移行して、中断を回避し、現在の Azure Monitor の取り込みパターンに合わせます。
新しいモデルの主な変更点:
- スキーマ定義は 、データ収集規則 (DCR) を通じて明示的に行われます。これにより、以前の自由形式のペイロード アプローチよりも、予測可能なスキーマ検証と一貫性のあるクエリ結果が得られます。
- インジェストは 、データ 収集エンドポイント (DCE) と DCR マッピングを介してルーティングされます。これにより、データ コレクター API エンドポイントに直接投稿するよりも、より制御されたインジェスト パスが提供されます。
- 認証では、サービス プリンシパル クライアント シークレット と 証明書ベースのオプションの両方が サポートされます。
- エミッタタイプは
AzureLogAnalyticsからAzureLogIngestionに変更されます。
移行には、通常、DCR リソースと DCE リソースの作成、Fabric環境の Spark プロパティの更新、カスタム Log Analytics テーブルへのデータ インジェストの検証が含まれます。
ログ インジェスト API の概要
Microsoft Fabricの Apache Spark 診断の場合、Log Ingestion API には、Azure Log Analyticsでの認証、スキーマ定義、ルーティング、テーブル配信のための構造化インジェスト モデルが用意されています。
主なコンポーネント
| コンポーネント | Purpose |
|---|---|
| アプリの登録資格情報 | クライアント シークレットまたは証明書を使用して、ログ インジェスト API 要求を認証するためのMicrosoft EntraのアプリIDを提供します。 |
| ログアナリティクステーブル | クエリと監視のために取り込まれた Spark 診断が格納されるターゲット カスタム テーブルを提供します。 |
| データ収集規則 (DCR) | インジェスト用の入力ストリーム、スキーマ マッピング、およびオプションの変換を定義します。 |
| データ収集エンドポイント (DCE) | DCR ベースのルーティングを介してデータを送信するためにクライアントによって使用されるインジェスト エンドポイント URI (dceUri) を提供します。 |
プログラムによるインジェストには、ログ インジェスト API 用に構成されたユーザー作成の DCR のみを使用できます。
ステップ バイ ステップの構成
ステップ 1. Log Analytics ワークスペースを準備する
Spark 診断を受信するには、Log Analytics ワークスペースが必要です。 これは、Azure Monitor Logs の基本的なストレージとクエリユニットです。
お持ちでない場合は、Azure ポータルでLog Analytics ワークスペースを作成します。
Important
次の手順を完了したら、Log Analytics ワークスペースと同じリージョンに、データ収集エンドポイント (DCE) リソースとデータ収集規則 (DCR) リソースを作成します。
ステップ 2. データ収集エンドポイント (DCE) を作成する
Azure ポータルでデータ収集エンドポイント (DCE) を作成します。 DCE は、ログ インジェスト API の Spark プロパティで構成するエンドポイント URI を提供します。 DCE のリージョンは、Log Analytics ワークスペースのリージョンと同じである必要があります。
Azure ポータルで、左側のナビゲーション ウィンドウの Monitor に移動します。
[設定] で、[データ収集エンドポイント] を選択し、[作成] を選択します。
エンドポイントを作成し、DCE 名 (たとえば、
DCEdemo) を書き留めます。
手順 3. サンプル JSON スキーマを準備する
カスタム ログ テーブルを作成するときは、データ収集規則 (DCR) を構成する必要があります。 DCR で指定されたデータ ストリーム定義に基づいて、Log Analytics ワークスペースに対応するテーブル スキーマが自動的に生成されます。
次の定義済みの JSON スキーマでは、各サンプルが特定のデータ型にマップされます。 シナリオに合ったサンプルをダウンロードし、関連付けられているカスタム テーブルと DCR を作成するときにアップロードします。
- Spark イベント ログ - イベント テーブル JSON スキーマのサンプル
- Spark ドライバーと Executor ログ - ログ テーブルの JSON スキーマサンプル
- Spark メトリック - メトリック テーブル JSON スキーマのサンプル
- プラットフォーム メタデータ - プラットフォーム メタデータ テーブル JSON スキーマのサンプル
Azure Log Analyticsの Spark ドライバーと Executor ログのログ テーブル JSON スキーマサンプルの例を次に示します。 ログ インジェスト用のカスタム テーブルと DCR を作成する場合は、このスキーマを参照として使用します。
[
{
"applicationId_s": "<APPLICATION_ID>",
"applicationName_s": "<NOTEBOOK_NAME>",
"artifactId_g": "<ARTIFACT_GUID>",
"artifactType_s": "SynapseNotebook",
"capacityId_g": "<CAPACITY_GUID>",
"Category": "Log",
"executorId_s": "driver",
"executorMax_s": 9,
"executorMin_s": 1,
"ExtraFields": {
"Category": "Log",
"JobId": "1"
},
"fabricEnvId_g": "<FABRIC_ENV_GUID>",
"fabricLivyId_g": "<FABRIC_LIVY_GUID>",
"fabricTenantId_g": "<FABRIC_TENANT_GUID>",
"fabricWorkspaceId_g": "<FABRIC_WORKSPACE_GUID>",
"isHighConcurrencyEnabled_s": false,
"Level": "INFO",
"logger_name_s": "org.apache.spark.scheduler.dynalloc.ExecutorMonitor",
"Message": "Executor 1 is removed.",
"thread_name_s": "spark-listener-group-executorManagement",
"TimeGenerated": "<TIME_GENERATED>",
"userId_g": "<USER_ID>"
}
]
ステップ 4. カスタム テーブルの作成 (直接取り込み)
ログ インジェスト API オプションを使用して、Log Analytics ワークスペースにカスタム テーブルを作成し、関連付けられている DCR に JSON スキーマ サンプルをアップロードします。 この手順は、Spark 診断の宛先を設定し、取り込まれたデータが予想されるスキーマに準拠していることを確認するために必要です。 インジェストを成功させるには、Log Analytics ワークスペース、DCE、DCR のリージョンが同じである必要があります。
Azure portal で、Log Analytics ワークスペース (たとえば、loganalyticsworkspacedemo) を開きます。
[テーブル>作成>新しいカスタム ログ (直接取り込み) を選択します。
テーブルの設定を入力します。
- テーブル名: たとえば、SparkLogTest (サフィックス "_CL" が自動的に追加されます)。
- テーブル プラン: アナリティクス
- データ収集規則: 新しい DCR ( SparkLogTestrule など) を作成します。
- データ収集エンドポイント: データ収集エンドポイントの作成 (DCE) ステップ (DCEdemo など) から DCE を選択します。
次へを選択します。
[スキーマと変換] で、JSON スキーマ サンプルをアップロードします。 スキーマはクライアント側で完全に安定しているため、DCR 変換を構成する必要はありません。
ステップ 5. 認証用のサービス プリンシパルを準備する
Microsoft Entra IDにアプリを登録します。
TenantId、ClientId、ClientSecret を記録します (クライアント シークレット認証を使用する場合)。 これらの値は、手順 6 の Spark 構成で使用します。
各テーブルの DCR リソースにMonitoring Metrics Publisher ロールをアプリに付与します。 ロールの割り当て手順については、「Azure ポータルを使用して Azure ロールを割り当てる」を参照してください。
ステップ 6. Spark プロパティを構成する
Spark を構成するには、Fabricで環境を作成し、次のいずれかの認証オプションを選択します。 特定のエミッタに対して 1 つのオプションのみを使用します。
Fabricの環境には、ノートブックと Spark ジョブ定義が実行時に使用する Spark 設定とライブラリが格納されます。 作成する手順については、「 Fabric で環境を作成、構成、および使用する」を参照してください。
- クライアント シークレットを使用して簡単にセットアップする場合は、 オプション 1 を選択します。
- 組織で証明書ベースの認証とAzure Key Vaultでの一元化された証明書管理が必要な場合は、Option 2 を選択します。
どちらのオプションでも、環境内の .ymlから [追加] を選択して、 .yml 構成ファイルをインポートできます。
オプション 1: サービス プリンシパルとクライアント シークレットを使用して構成する
このオプションは、サービス プリンシパルの資格情報とクライアント シークレットを使用して簡単にセットアップする場合に使用します。
Fabricで環境を作成します。
適切な値を使用して次の Spark プロパティ を環境に追加するか、リボン の.ymlから [追加] を選択して
.yml構成ファイルをインポートします。spark.synapse.diagnostic.emitters: <EMITTER_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type: AzureLogIngestion spark.synapse.diagnostic.emitter.<EMITTER_NAME>.categories: DriverLog,ExecutorLog,EventLog,Metrics spark.synapse.diagnostic.emitter.<EMITTER_NAME>.dceUri: https://<DCE_NAME>.<REGION>.ingest.monitor.azure.com spark.synapse.diagnostic.emitter.<EMITTER_NAME>.logDcr: <LOG_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.logStream: <LOG_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.eventDcr: <EVENT_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.eventStream: <EVENT_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metricDcr: <METRIC_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metricStream: <METRIC_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metaDcr: <META_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metaStream: <META_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.secret: <SP_CLIENT_SECRET> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.tenantId: <SP_TENANT_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.clientId: <SP_CLIENT_ID> spark.fabric.pools.skipStarterPools: 'true'変更を保存して公開します。
オプション 2: サービス プリンシパル証明書認証を使用して構成する
組織で証明書ベースの認証が必要な場合は、このオプションを使用します。
開始する前に、サービス プリンシパルが証明書で作成されていることを確認します。 詳細については、「
Fabricで環境を作成します。
適切な値を使用して次の Spark プロパティ を環境に追加するか、リボン の.ymlから [追加] を選択して
.yml構成ファイルをインポートします。spark.synapse.diagnostic.emitters: <EMITTER_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type: AzureLogIngestion spark.synapse.diagnostic.emitter.<EMITTER_NAME>.categories: DriverLog,ExecutorLog,EventLog,Metrics spark.synapse.diagnostic.emitter.<EMITTER_NAME>.dceUri: https://<DCE_NAME>.<REGION>.ingest.monitor.azure.com spark.synapse.diagnostic.emitter.<EMITTER_NAME>.logDcr: <LOG_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.logStream: <LOG_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.eventDcr: <EVENT_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.eventStream: <EVENT_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metricDcr: <METRIC_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metricStream: <METRIC_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metaDcr: <META_DCR_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metaStream: <META_STREAM_NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.certificateName: <SP_CERT-NAME> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault: https://<KEYVAULT_NAME>.vault.azure.net/ spark.synapse.diagnostic.emitter.<EMITTER_NAME>.tenantId: <SP_TENANT_ID> spark.synapse.diagnostic.emitter.<EMITTER_NAME>.clientId: <SP_CLIENT_ID> spark.fabric.pools.skipStarterPools: 'true'変更を保存して公開します。
ステップ 7. 環境をノートブックまたは Spark ジョブ定義にアタッチするか、ワークスペースの既定値として設定する
スコープに基づいて、次のいずれかの方法を使用します。
- ターゲットロールアウト、テスト、または項目ごとの制御が必要な場合は、特定のノートブックまたは Spark ジョブ定義に環境をアタッチします。
- ワークスペース全体に一貫した Spark 診断設定を適用する場合は、環境をワークスペースの既定値として設定します。
環境をノートブックまたは Spark ジョブ定義にアタッチするには:
- Fabricでノートブックまたは Spark ジョブ定義に移動します。
- [ホーム] タブ上の [環境] メニューを選択し、構成済みの環境を選択します。
- この構成は Spark セッションの開始後に適用されます。
ワークスペースの既定値として環境を設定するには:
- Fabricのワークスペース設定に移動します。
- ワークスペース設定 (ワークスペース設定>Data Engineering/Science>Spark 設定) で Spark 設定を検索します。
- [ 環境 ] タブを選択し、診断 Spark プロパティが構成されている環境を選択し、[ 保存] を選択します。
手順 8. Spark ワークロードを実行し、ログとメトリックを確認する
前のセクションで作成してアタッチした環境を使用し、Spark ワークロードを実行し、Log Analyticsでインジェストを確認します。
- 前のセクションで構成した環境を使用して Spark ワークロードを実行します。 次のいずれかの方法を使用できます。
- Fabricでノートブックを実行します。
- Spark ジョブ定義を使用して Spark バッチ ジョブを送信します。
- パイプラインで Spark アクティビティを実行します。
- ターゲット Log Analytics ワークスペースを開き、実行中のワークロードに対してログとメトリックが取り込まれたことを確認します。
- インジェストを検証してレコードを検査するには、Kusto を使用した データのクエリで Kusto の例を使用します。
カスタム アプリケーション ログを書き込む
プラットフォーム診断に加えて、ビジネス レベルまたはアプリ固有のイベントが必要な場合は、カスタム アプリケーション ログを使用します。 これらのログは、同じ診断パイプラインを介して出力され、Spark ログ、イベント ログ、メトリックと共にLog Analyticsに表示されます。
Spark コードで Apache Log4j を使用して、カスタム ログ メッセージを出力します。 次の例は、Scala と PySpark の最小パターンを示しています。
Scala の例:
%%spark
val logger = org.apache.log4j.LogManager.getLogger("com.contoso.LoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
//log exception
try {
1/0
} catch {
case e:Exception =>logger.warn("Exception", e)
}
// run job for task level metrics
val data = sc.parallelize(Seq(1,2,3,4)).toDF().count()
PySpark の例:
%%pyspark
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.contoso.PythonLoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
Kusto を使用してデータのクエリを実行する
Kusto クエリを使用して、インジェストが機能していることを検証し、Spark の実行動作を調査します。
{FabricWorkspaceId}、{ArtifactId}、{LivyId}などのプレースホルダーの値を、独自の実行の値に置き換えます。
イベント クエリとログ クエリから始めてデータの到着を確認し、パフォーマンス分析にメトリック クエリを使用します。
Apache Spark イベントのクエリを実行するには:
SparkEventTest_CL
| where fabricWorkspaceId_g == "{FabricWorkspaceId}" and artifactId_g == "{ArtifactId}" and fabricLivyId_g == "{LivyId}"
| order by TimeGenerated desc
| limit 100
Spark アプリケーション ドライバーと Executor ログのクエリを実行するには:
SparkLogTest_CL
| where fabricWorkspaceId_g == "{FabricWorkspaceId}" and artifactId_g == "{ArtifactId}" and fabricLivyId_g == "{LivyId}"
| order by TimeGenerated desc
| limit 100
Apache Spark メトリックのクエリを実行するには:
SparkMetricsTest_CL
| where fabricWorkspaceId_g == "{FabricWorkspaceId}" and artifactId_g == "{ArtifactId}" and fabricLivyId_g == "{LivyId}"
| where name_s endswith "jvm.total.used"
| summarize max(value_d) by bin(TimeGenerated, 30s), executorId_s
| order by TimeGenerated asc
プラットフォーム メタデータに対してクエリを実行するには:
SparkMetadataTest_CL
| where fabricWorkspaceId_g == "{FabricWorkspaceId}" and artifactId_g == "{ArtifactId}" and fabricLivyId_g == "{LivyId}"
| order by TimeGenerated desc
| limit 100
マネージド仮想ネットワークを使用してワークスペースをFabricする
Fabricワークスペースのデータ流出保護の有効化をサポートしています。 流出保護では、ログとメトリックを宛先エンドポイントに直接送信することはできません。 このシナリオでは、異なる宛先 エンドポイントに対応するマネージド プライベート エンドポイントを作成できます。
使用可能な Apache Spark 構成
次の表に、ログ インジェスト API を使用してAzure Log Analyticsにログとメトリックを送信するための Spark 構成を示します。
Important
Azure Log Analyticsの場合は、spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type を AzureLogIngestion に設定します。
AzureLogAnalytics は、従来の HTTP データ コレクター API の種類です。 従来のガイダンスについては、Monitor Apache Spark アプリケーションと Azure Log Analytics を参照してください。
| コンフィギュレーション | 説明 |
|---|---|
spark.synapse.diagnostic.emitters |
診断エミッタのコンマ区切りの宛先名。 たとえば、 MyDest1、MyDest2。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type |
組み込みの宛先の種類。 ログ インジェスト API を使用してAzure Log Analyticsを有効にするには、この値を AzureLogIngestion に設定します。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.categories |
コンマ区切りの選択されたログ カテゴリ。 指定できる値には、DriverLog、ExecutorLog、EventLog、Metrics が含まれます。 設定しない場合、既定値はすべてのカテゴリです。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.dceUri |
データ収集規則 (DCR) を使用してデータをルーティングするときにインジェストに使用されるデータ収集エンドポイント (DCE) URI。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.logDcr |
Spark ログを宛先にルーティングするために使用されるデータ収集規則 (DCR) リソース ID。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.logStream |
Spark ログのデータ収集規則 (DCR) で定義されているストリーム名。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.eventDcr |
Spark イベント ログのルーティングに使用されるデータ収集規則 (DCR) リソース ID。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.eventStream |
Spark イベント ログのデータ収集規則 (DCR) で定義されているストリーム名。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metricDcr |
Spark メトリックのルーティングに使用されるデータ収集規則 (DCR) リソース ID。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metricStream |
Spark メトリックのデータ収集規則 (DCR) で定義されているストリーム名。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metaDcr |
Spark メタデータのルーティングに使用されるデータ収集規則 (DCR) リソース ID。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.metaStream |
Spark メタデータのデータ収集規則 (DCR) で定義されているストリーム名。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.certificateName |
認証に使用される、Azure Key Vaultに格納されている証明書の名前。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault |
認証証明書を格納するAzure Key Vault URI。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.tenantId |
認証に使用されるMicrosoft Entra テナント ID。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.clientId |
Microsoft Entra IDに登録されているクライアント (アプリケーション) ID。 |
spark.fabric.pools.skipStarterPools |
この Spark プロパティは、オンデマンドの Spark セッションを強制的に実行するために使用されます。 既定のプールを使用してライブラリをトリガーしてログとメトリックを出力する場合は、値を true に設定します。 |
spark.synapse.diagnostic.emitter.<EMITTER_NAME>.secret |
Microsoft Entra ID (Azure AD) アプリケーションに関連付けられているクライアント シークレット。診断データの送信時にエミッターを認証するためにテナント ID とクライアント ID と共に使用されます。 この設定は、証明書ベースの認証と相互に排他的であり、クライアント シークレットまたは証明書のどちらかを構成しますが、両方を構成することはできません。 |
関連するコンテンツ
- Apache Spark ジョブ定義の作成
Microsoft Fabric - Microsoft Fabric のノートブックを開発、実行、管理する
- Spark アプリケーションを監視する
Azure Event Hubs - Azure Storage アカウントを使用して Apache Spark 診断を収集する