Microsoft Sentinelでのカスタム グラフの使用 (プレビュー)

Microsoft Sentinelのカスタム グラフを使用すると、セキュリティ研究者やアナリストは、セキュリティ データのカスタマイズされたグラフ表現を作成できます。 カスタム グラフを構築することで、特定の攻撃パターンをモデル化し、脅威を調査し、高度なグラフ アルゴリズムを実行して、デジタル環境内の隠された関係を明らかにすることができます。 このガイドでは、Microsoft Sentinel Visual Studio Code 拡張機能で Jupyter ノートブックを使用してカスタム グラフを作成および管理する手順について説明します。

この記事では、コードを使用してカスタム グラフを手動で作成することに焦点を当てます。 AI 主導のエクスペリエンスについては、Microsoft Sentinelでの AI 支援カスタム グラフ作成に関するページを参照してください。

前提条件

Microsoft Sentinelでカスタム グラフを作成するには、次のものが必要です。

この記事のサンプル コードで使用するMicrosoft Entra資産テーブルを取り込むには、Microsoft Entra ID コネクタを有効にします。 詳細については、「Microsoft Sentinel データ レイクでの資産データ インジェスト」を参照してください。

アクセス許可

カスタム グラフを操作するには、データ レイクで次の XDR アクセス許可Sentinel必要があります。 次の表に、一般的なグラフ操作のアクセス許可の要件を示します。

グラフ操作 必要なアクセス許可
ノートブック グラフをモデル化して構築する Microsoft Sentinel データ コレクションに対するデータ (管理) アクセス許可を持つカスタム Microsoft Defender XDR統合 RBAC ロールを使用します。
テナントでグラフを保持する 次のいずれかのMicrosoft Entra IDロールを使用します。
セキュリティ オペレーター
セキュリティ管理者
全体管理者
永続化されたグラフのクエリ Microsoft Sentinel データコレクションに対するセキュリティ データの基本 (読み取り) アクセス許可を持つカスタム Microsoft Defender XDR統合 RBAC ロールを使用します。

重要

グラフで使用されるデータを読み取るアクセス許可が必要です。 特定のデータセットにアクセスできない場合、そのデータはグラフに含まれません。 グラフを作成するには、Sentinelスコープによって制限しないでください。 スコープ付きユーザーは、カスタム グラフを作成できません。

Microsoft Entra IDロールは、データ レイク内のすべてのワークスペースに広範なアクセスを提供します。 詳細については、「Microsoft Sentinelのロールとアクセス許可」を参照してください。

Visual Studio Code と Microsoft Sentinel 拡張機能をインストールする

Microsoft Sentinel Visual Studio Code 拡張機能で Jupyter ノートブックを使用してカスタム グラフを作成します。 詳細については、「Visual Studio Code と Microsoft Sentinel 拡張機能のインストール」を参照してください。

カスタム グラフを作成する

カスタム グラフを作成して操作するには、次の手順を実行します。

  1. カスタム グラフをモデル化する
  2. グラフ ジョブをスケジュールしてカスタム グラフを保持する
  3. カスタム グラフの表示と管理

カスタム グラフをモデル化する

Microsoft Sentinel Visual Studio Code 拡張機能で Jupyter ノートブックを使用してカスタム グラフを作成します。

次の手順では、サンプル ノートブックを使用して最初のカスタム グラフを作成する手順について説明します。

ノートブックを設定し、Data Lake に接続する

  1. Microsoft Sentinel拡張機能がインストールされている Visual Studio Code で、左側のメニューの [Microsoft Sentinel] アイコンを選択します。

  2. [サインイン] を選択してグラフを表示する

  3. ダイアログ ボックスが表示され、Microsoft を使用してサインインする拡張機能 'Microsoft Sentinel' というテキストが表示されます。 [ 許可] を選択 してサインインします。

  4. 資格情報を使用してサインインします。

  5. サインインした後、[ + ] を選択し、[ 新しいノートブックの作成] を選択します。

  6. ノートブック ファイルに名前を付け、ワークスペース内の適切な場所に保存します。

    Visual Studio Code のグラフのサインイン ページのスクリーンショット。

  7. ノートブック ウィンドウの右上にある [ カーネルの選択 ] を選択して、Spark コンピューティング プールを選択します。

  8. [Microsoft Sentinel] を選択し、使用可能な Spark プールのいずれかを選択します。

    Visual Studio Code の [カーネルの選択] ページのスクリーンショット。

    ヒント

    AI プロンプトを使用すると、カスタム グラフ ノートブックの作成に役立ちます。 詳細については、「Microsoft Sentinelでの AI 支援カスタム グラフ作成」を参照してください。

  9. セルの左側にある実行セルの三角形アイコンを選択して、セルを に実行します。 セルを初めて実行するときに、まだカーネルを選択していない場合は、カーネルの選択を求められる場合があります。

    セルを初めて実行するときは、Spark セッションを開始するのに約 5 分かかります。

    Visual Studio Code の最初のセルの実行を示すスクリーンショット。

グラフを作成する

次の例では、グループ メンバーシップMicrosoft Entra走査し、入れ子になったグループリレーションシップを理解するグラフを作成します。 サンプル コードは、カスタム グラフ機能を学習し、調査のためにグラフ トラバーサルの力を活用するための簡単なユース ケースの使用を開始するのに役立ちます。 Sentinel データ レイクで使用できる任意のテーブルからグラフを作成できます。

  1. ワークスペースに接続し、Entraアセット テーブルを読み取ってグラフの作成を開始します。

    from pyspark.sql import functions as F
    from sentinel_lake.providers import MicrosoftSentinelProvider
    
    lake_provider = MicrosoftSentinelProvider(spark=spark)
    
    # Use the "System tables" workspace which contains the Entra* Assets tables
    # If you are data is in a different workspace, update this variable accordingly and ensure the tables are present
    LOG_ANALYTICS_WORKSPACE = "System tables"
    
    # Dynamically get the latest snapshot time from EntraUsers
    snapshot_time = (
        lake_provider.read_table("EntraUsers", LOG_ANALYTICS_WORKSPACE)
        .df.agg(F.max("_SnapshotTime").alias("max_snapshot"))
        .collect()[0]["max_snapshot"]
        .strftime("%Y-%m-%dT%H:%M:%SZ")
    )
    print(f"Using snapshot_time: {snapshot_time}")
    
    snapshot_filter = (F.col("_SnapshotTime") == F.lit(snapshot_time).cast("timestamp"))
    
    # Load EntraMembers - edges: group contains user/group/servicePrincipal
    df_members = (
        lake_provider.read_table("EntraMembers", LOG_ANALYTICS_WORKSPACE)
        .filter(
            snapshot_filter
            & (F.col("sourceType") == "group")
            & (F.col("targetType").isin("user", "group", "servicePrincipal"))
        )
    )
    
    # Load EntraGroups - nodes
    df_groups = (
        lake_provider.read_table("EntraGroups", LOG_ANALYTICS_WORKSPACE)
        .filter(snapshot_filter)
        .select("id", "displayName", "mailEnabled")
    )
    
    # Load EntraUsers - nodes
    df_users = (
        lake_provider.read_table("EntraUsers", LOG_ANALYTICS_WORKSPACE)
        .filter(snapshot_filter)
        .select("id", "accountEnabled", "displayName", "department",
                "lastPasswordChangeDateTime", "userPrincipalName", "usageLocation")
    )
    
    # Load EntraServicePrincipals - nodes
    df_service_principals = (
        lake_provider.read_table("EntraServicePrincipals", LOG_ANALYTICS_WORKSPACE)
        .filter(snapshot_filter)
        .select("accountEnabled", "id", "displayName", "servicePrincipalType",
                "tenantId", "organizationId")
    )
    
    # Fix for Spark 3.x Parquet datetime rebase issue. Required when reading Parquet files
    # written by Spark 2.x which used the Julian calendar, whereas Spark 3.x uses Proleptic
    # Gregorian. Without these settings, timestamp columns (e.g. lastPasswordChangeDateTime)
    # may throw errors or return incorrect values. Safe to remove if all data was written by
    # Spark 3.x (typical for current Fabric/Sentinel environments).
    spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")
    spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")
    spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
    
  2. グラフの作成に必要なノードとエッジの DataFrame を準備する

    # ============================================================
    # NODE PREPARATION
    # ============================================================
    
    # EntraUser nodes - keyed by user id
    user_nodes = (
        df_users.df
        .select(
            F.col("id"),
            F.col("displayName"),
            F.col("accountEnabled"),
            F.col("department"),
            F.col("lastPasswordChangeDateTime"),
            F.col("userPrincipalName"),
            F.col("usageLocation")
        )
    )
    
    # EntraGroup nodes - keyed by group id
    group_nodes = (
        df_groups.df
        .select(
            F.col("id"),
            F.col("displayName"),
            F.col("mailEnabled")
        )
    )
    
    # EntraServicePrincipal nodes - keyed by SP id
    sp_nodes = (
        df_service_principals.df
        .select(
            F.col("id"),
            F.col("displayName"),
            F.col("accountEnabled"),
            F.col("servicePrincipalType"),
            F.col("tenantId"),
            F.col("organizationId")
        )
    )
    
    # ============================================================
    # EDGE PREPARATION
    # ============================================================
    
    # Edge: EntraGroup --Contains--> EntraUser
    edge_group_contains_user = (
        df_members.df
        .filter(F.col("targetType") == "user")
        .select(
            F.col("sourceId").alias("SourceGroupId"),
            F.col("targetId").alias("TargetUserId")
        )
        .distinct()
        .withColumn("EdgeKey", F.concat_ws("_", F.col("SourceGroupId"), F.col("TargetUserId")))
    )
    
    # Edge: EntraGroup --Contains--> EntraGroup (nested groups)
    edge_group_contains_group = (
        df_members.df
        .filter(F.col("targetType") == "group")
        .select(
            F.col("sourceId").alias("SourceGroupId"),
            F.col("targetId").alias("TargetGroupId")
        )
        .distinct()
        .withColumn("EdgeKey", F.concat_ws("_", F.col("SourceGroupId"), F.col("TargetGroupId")))
    )
    
    # Edge: EntraGroup --Contains--> EntraServicePrincipal
    edge_group_contains_sp = (
        df_members.df
        .filter(F.col("targetType") == "servicePrincipal")
        .select(
            F.col("sourceId").alias("SourceGroupId"),
            F.col("targetId").alias("TargetSPId")
        )
        .distinct()
        .withColumn("EdgeKey", F.concat_ws("_", F.col("SourceGroupId"), F.col("TargetSPId")))
    )
    
  3. グラフ スキーマを定義し、前の手順で作成した DataFrame にバインドする

    from sentinel_graph import GraphSpecBuilder, Graph
    
    # Define the graph schema 
    
    entra_group_graph_spec = (
        GraphSpecBuilder.start()
    
        # === NODES ===
    
        .add_node("EntraUser")
        .from_dataframe(user_nodes)  # Native Spark DataFrame (from .df + .select + .distinct)
        .with_columns(
            "id", "displayName", "accountEnabled",
            "department", "lastPasswordChangeDateTime", "userPrincipalName", "usageLocation",
            key="id", display="displayName"
        )
    
        .add_node("EntraGroup")
        .from_dataframe(group_nodes)  # Native Spark DataFrame
        .with_columns(
            "id", "displayName", "mailEnabled",
            key="id", display="displayName"
        )
    
        .add_node("EntraServicePrincipal")
        .from_dataframe(sp_nodes)  # Native Spark DataFrame
        .with_columns(
            "id", "displayName", "accountEnabled",
            "servicePrincipalType", "tenantId", "organizationId",
            key="id", display="displayName"
        )
    
        # === EDGES ===
    
        # EntraGroup --ContainsUser--> EntraUser
        .add_edge("ContainsUser")
        .from_dataframe(edge_group_contains_user)  # Native Spark DataFrame
        .source(id_column="SourceGroupId", node_type="EntraGroup")
        .target(id_column="TargetUserId", node_type="EntraUser")
        .with_columns("SourceGroupId", "TargetUserId", "EdgeKey",
                      key="EdgeKey", display="EdgeKey")
    
        # EntraGroup --ContainsGroup--> EntraGroup (nested groups)
        .add_edge("ContainsGroup")
        .from_dataframe(edge_group_contains_group)  # Native Spark DataFrame
        .source(id_column="SourceGroupId", node_type="EntraGroup")
        .target(id_column="TargetGroupId", node_type="EntraGroup")
        .with_columns("SourceGroupId", "TargetGroupId", "EdgeKey",
                      key="EdgeKey", display="EdgeKey")
    
        # EntraGroup --ContainsServicePrincipal--> EntraServicePrincipal
        .add_edge("ContainsServicePrincipal")
        .from_dataframe(edge_group_contains_sp)  # Native Spark DataFrame
        .source(id_column="SourceGroupId", node_type="EntraGroup")
        .target(id_column="TargetSPId", node_type="EntraServicePrincipal")
        .with_columns("SourceGroupId", "TargetSPId", "EdgeKey",
                      key="EdgeKey", display="EdgeKey")
    
    ).done()
    
  4. グラフ スキーマを検証する

    # Check the schema of the graph spec to ensure it's correct
    entra_group_graph_spec.show_schema()
    
  5. データの準備やグラフの公開など、グラフを作成する

    # Build the graph from the spec - this will validate the spec and prepare it for querying
    # Alter options is to use Graph.prepare() to prepare the graph nodes and edges in the lake
    # and then use Graph.publish() to create the graph. You would typically call prepare() and publish()
    # seperately to understand the cost of Graph API calls that are triggeterd by Graph.publish()
    # see https://learn.microsoft.com/azure/sentinel/billing?tabs=simplified%2Ccommitment-tiers
    intra_group_graph = Graph.build(entra_group_graph_spec)
    

    注:

    対話型ノートブック セッション中に作成されたグラフは、ノートブック セッションが閉じられると削除されます。 再利用と共有のためにグラフを保持するには、「カスタム グラフを保持する」を参照してください。

これで、ノートブックにグラフが作成されました。

グラフの視覚的な表現を表示するには、新しいセル貼り付けで次のコードを実行します。

# Query 1: Find nested group relationships nexting up to 8 levels deep
# Update the Entra Group name that you want to traverse from
query_nested_groups = """
MATCH p=(g1:EntraGroup)-[cg]->{1,8}(g2)
WHERE g1.displayName = 'tmplevel3'
RETURN *
"""
intra_group_graph.query(query_nested_groups).show()

このコードでは、サンプルの Graph クエリ言語 (GQL) クエリを実行して、最大 8 レベルまでの入れ子になったグループ メンバーシップを取得します。結果のグラフは出力で視覚化されます

Visual Studio Code でのグラフの視覚化を示すスクリーンショット。

カスタム グラフを保持する

ノートブックでグラフ コードを作成したら、対話型セッションでノートブックを実行するか、グラフ ジョブをスケジュールできます。 対話型ノートブック セッション中に作成されたグラフは一時的なもので、ノートブック セッションのコンテキストでのみ使用できます。 グラフを保存してチームと共有するには、グラフジョブをスケジュールしてグラフを頻繁に再構築します。 グラフを保存すると、Sentinel、Visual Studio Code Notebook、Graph クエリ API の下にあるMicrosoft Defender ポータルのグラフ エクスペリエンスからアクセスできます。

  1. グラフ ノートブックから [ スケジュールされたジョブの作成] を選択し、[ グラフ ジョブの作成] を選択します。

    グラフ ノートブックの [スケジュールされたジョブの作成] ボタンを示すスクリーンショット。

  2. [ グラフ ジョブの作成 ] フォームで、[ グラフ名 ] と [説明] を入力し、[パス] に正しいグラフ ノートブックが含まれていることを確認 します

  3. 更新スケジュールを構成せずにグラフを作成するには、[スケジュール] セクションで [オンデマンド] を選択し、[送信] を選択してグラフを作成します。

    注:

    オンデマンド スケジュールを使用して作成されたグラフの既定の保持期間は 30 日で、有効期限が切れると削除されます。

  4. グラフ データが定期的に更新されるグラフを作成するには、[スケジュール] セクションで [スケジュール] を選択します。

    1. ジョブの 繰り返し頻度 を選択します。 [ 分単位]、[ 時間単位]、[ 週単位]、[ 毎日]、[ 月単位] から選択できます。

    2. 選択した頻度に応じて、スケジュールを構成するためのその他のオプションが表示されます。 たとえば、曜日、時刻、月の日などです。

    3. スケジュールが実行を開始するための [ 開始時刻 ] を選択します。

    4. 実行を停止するスケジュールの [期限内の終了] を選択します。 スケジュールの終了時刻を設定しない場合は、[ジョブの設定] を選択 して無期限に実行します。 日付と時刻はタイムゾーンにあります。

    5. [ 送信] を 選択してジョブ構成を保存し、ジョブを発行します。 グラフ作成プロセスは、テナントで開始されます。 Sentinel拡張機能で、新しく作成されたグラフとその最新の状態を表示します。

    グラフ ジョブの作成ページのスクリーンショット。

カスタム グラフの表示と管理

グラフ ジョブを作成したら、Visual Studio Code のMicrosoft Sentinel拡張機能からテナントのグラフを表示および管理できます。

  1. グラフの一覧から、具体化されたグラフを選択して詳細を表示します。

  2. [ ジョブの詳細 ] タブを選択すると、最後の実行時、次回の実行時、ビルド プロセス中に発生したエラーなど、グラフ ジョブの状態が表示されます。

  3. [ 今すぐ実行 ] を選択して、スケジュールされた時間外にグラフ ビルドを手動でトリガーします。 状態は、グラフの作成中に [キューに入っている] に変わり、[進行中] に変わります。

    Visual Studio Code の [グラフ ジョブの詳細] タブを示すスクリーンショット。

  4. グラフのビルドが完了すると、[ 状態][準備完了] に更新されます。 [グラフの詳細] タブを選択して、グラフに関する情報を表示します。

    グラフの詳細タブのスクリーンショット。

  5. Defender ポータルのMicrosoft Sentinelのグラフ視覚化からグラフのクエリを実行して視覚化できるようになりました。 詳細については、「Microsoft Sentinel グラフでのグラフの視覚化 (プレビュー)」を参照してください。