Kom i gang med egendefinerte grafer i Microsoft Sentinel (forhåndsversjon)

Egendefinerte grafer i Microsoft Sentinel gjør det mulig for sikkerhetsforskere og analytikere å opprette skreddersydde grafrepresentasjoner av sikkerhetsdataene. Ved å bygge egendefinerte grafer kan du modellere bestemte angrepsmønstre, undersøke trusler og kjøre avanserte grafalgoritmer for å avdekke skjulte relasjoner i det digitale miljøet. Denne veiledningen veileder deg gjennom trinnene for å opprette og administrere egendefinerte grafer ved hjelp av Jupyter-notatblokker i Microsoft Sentinel Visual Studio Code utvidelse.

Denne artikkelen fokuserer på manuell redigering av egendefinerte grafer ved hjelp av kode. Hvis du vil ha en AI-drevet opplevelse, kan du se ai-assistert egendefinert grafredigering i Microsoft Sentinel.

Forutsetninger

Følgende kreves for å opprette egendefinerte grafer i Microsoft Sentinel:

Aktiver Microsoft Entra ID kobling til å ta inn Microsoft Entra aktivatabeller som brukes i eksempelkoden for denne artikkelen. Hvis du vil ha mer informasjon, kan du se Aktivadatainntak i Microsoft Sentinel datasjøen.

Tillatelser

Hvis du vil samhandle med egendefinerte grafer, trenger du følgende XDR-tillatelser i Sentinel datasjøen. Tabellen nedenfor viser tillatelseskravene for vanlige grafoperasjoner:

Graph-operasjon Tillatelser kreves
Modellere og bygge en notatblokkgraf Bruk en egendefinert Microsoft Defender XDR enhetlig RBAC-rolle med datatillatelser (behandle) over Microsoft Sentinel datainnsamling.
Vedvare et diagram i leieren Bruk én av følgende Microsoft Entra ID roller:
Sikkerhetsoperatør
Sikkerhetsadministrator
Global administrator
Spør etter et fast diagram Bruk en egendefinert Microsoft Defender XDR enhetlig RBAC-rolle med grunnleggende sikkerhetsdatatillatelser (lest) i Microsoft Sentinel datainnsamling.

Viktig

Du må ha tillatelse til å lese dataene som brukes i grafen. Hvis du ikke har tilgang til et bestemt datasett, inkluderes ikke disse dataene i grafen. Hvis du vil opprette en graf, må du ikke begrenses av et Sentinel omfang. En omfangsbruker kan ikke opprette en egendefinert graf.

Microsoft Entra ID roller gir bred tilgang på tvers av alle arbeidsområder i datasjøen. Hvis du vil ha mer informasjon, kan du se Roller og tillatelser i Microsoft Sentinel.

Installer Visual Studio Code og Microsoft Sentinel filtype

Opprett egendefinerte grafer ved hjelp av Jupyter-notatblokker i Microsoft Sentinel Visual Studio Code utvidelse. Hvis du vil ha mer informasjon, kan du se Installer Visual Studio Code og Microsoft Sentinel filtypen

Opprette et egendefinert diagram

Gjør følgende for å opprette og arbeide med egendefinerte grafer:

  1. Modellere et egendefinert diagram
  2. Behold det egendefinerte diagrammet ved å planlegge en grafjobb
  3. Vise og behandle egendefinerte grafer

Modellere et egendefinert diagram

Opprett en egendefinert graf ved hjelp av en Jupyter-notatblokk i Microsoft Sentinel Visual Studio Code-utvidelsen.

Følgende trinn veileder deg gjennom oppretting av den første egendefinerte grafen ved hjelp av en eksempelnotatblokk.

Konfigurere notatblokken og koble til datasjøen

  1. I Visual Studio Code med Microsoft Sentinel-utvidelsen installert, velger du Microsoft Sentinel-ikonet i menyen til venstre.

  2. Velg Logg på for å vise grafer

  3. En dialogboks vises med teksten Filtypen Microsoft Sentinel ønsker å logge på med Microsoft. Velg Tillat å logge på.

  4. Logg på med legitimasjonen din.

  5. Når du har logget på, velger du + Opprett ny notatblokk.

  6. Gi notatblokkfilen et navn, og lagre den på en passende plassering i arbeidsområdet.

    Et skjermbilde av påloggingssiden for grafer i Visual Studio Code.

  7. Velg Velg kjerne øverst til høyre i notatblokkvinduet for å velge et spark-databehandlingsutvalg.

  8. Velg Microsoft Sentinel, og velg deretter et av de tilgjengelige spark-bassengene.

    Et skjermbilde av velg kjernesiden i Visual Studio Code.

    Tips

    Du kan bruke AI-ledetekster for å hjelpe deg med å opprette en egendefinert grafnotatblokk. Hvis du vil ha mer informasjon, kan du se ai-assistert egendefinert grafredigering i Microsoft Sentinel.

  9. Kjør en celle til ved å velge ikonet for kjør celletrekant til venstre for cellen. Første gang du kjører en celle, kan du bli bedt om å velge en kjerne hvis du ikke allerede har valgt en.

    Første gang du kjører en celle, tar det omtrent fem minutter å starte Spark-økten.

    Et skjermbilde som viser kjøringen av den første cellen i Visual Studio Code.

Opprette en graf

Følgende eksempel oppretter en graf for å gå gjennom Microsoft Entra gruppemedlemskap og forstå nestede grupperelasjoner. Eksempelkoden hjelper deg med å komme i gang med et enkelt brukstilfelle for å lære den egendefinerte graffunksjonen og dra nytte av kraften i grafisk traversering for undersøkelsene dine. Du kan opprette en graf fra en hvilken som helst tabell som er tilgjengelig i Sentinel datasjøen.

  1. Koble til arbeidsområdet og les Entra aktivatabeller for å begynne å bygge grafen.

    from pyspark.sql import functions as F
    from sentinel_lake.providers import MicrosoftSentinelProvider
    
    lake_provider = MicrosoftSentinelProvider(spark=spark)
    
    # Use the "System tables" workspace which contains the Entra* Assets tables
    # If you are data is in a different workspace, update this variable accordingly and ensure the tables are present
    LOG_ANALYTICS_WORKSPACE = "System tables"
    
    # Dynamically get the latest snapshot time from EntraUsers
    snapshot_time = (
        lake_provider.read_table("EntraUsers", LOG_ANALYTICS_WORKSPACE)
        .df.agg(F.max("_SnapshotTime").alias("max_snapshot"))
        .collect()[0]["max_snapshot"]
        .strftime("%Y-%m-%dT%H:%M:%SZ")
    )
    print(f"Using snapshot_time: {snapshot_time}")
    
    snapshot_filter = (F.col("_SnapshotTime") == F.lit(snapshot_time).cast("timestamp"))
    
    # Load EntraMembers - edges: group contains user/group/servicePrincipal
    df_members = (
        lake_provider.read_table("EntraMembers", LOG_ANALYTICS_WORKSPACE)
        .filter(
            snapshot_filter
            & (F.col("sourceType") == "group")
            & (F.col("targetType").isin("user", "group", "servicePrincipal"))
        )
    )
    
    # Load EntraGroups - nodes
    df_groups = (
        lake_provider.read_table("EntraGroups", LOG_ANALYTICS_WORKSPACE)
        .filter(snapshot_filter)
        .select("id", "displayName", "mailEnabled")
    )
    
    # Load EntraUsers - nodes
    df_users = (
        lake_provider.read_table("EntraUsers", LOG_ANALYTICS_WORKSPACE)
        .filter(snapshot_filter)
        .select("id", "accountEnabled", "displayName", "department",
                "lastPasswordChangeDateTime", "userPrincipalName", "usageLocation")
    )
    
    # Load EntraServicePrincipals - nodes
    df_service_principals = (
        lake_provider.read_table("EntraServicePrincipals", LOG_ANALYTICS_WORKSPACE)
        .filter(snapshot_filter)
        .select("accountEnabled", "id", "displayName", "servicePrincipalType",
                "tenantId", "organizationId")
    )
    
    # Fix for Spark 3.x Parquet datetime rebase issue. Required when reading Parquet files
    # written by Spark 2.x which used the Julian calendar, whereas Spark 3.x uses Proleptic
    # Gregorian. Without these settings, timestamp columns (e.g. lastPasswordChangeDateTime)
    # may throw errors or return incorrect values. Safe to remove if all data was written by
    # Spark 3.x (typical for current Fabric/Sentinel environments).
    spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")
    spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")
    spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
    
  2. Klargjøre noden og edge-datarammene som kreves for å bygge grafen

    # ============================================================
    # NODE PREPARATION
    # ============================================================
    
    # EntraUser nodes - keyed by user id
    user_nodes = (
        df_users.df
        .select(
            F.col("id"),
            F.col("displayName"),
            F.col("accountEnabled"),
            F.col("department"),
            F.col("lastPasswordChangeDateTime"),
            F.col("userPrincipalName"),
            F.col("usageLocation")
        )
    )
    
    # EntraGroup nodes - keyed by group id
    group_nodes = (
        df_groups.df
        .select(
            F.col("id"),
            F.col("displayName"),
            F.col("mailEnabled")
        )
    )
    
    # EntraServicePrincipal nodes - keyed by SP id
    sp_nodes = (
        df_service_principals.df
        .select(
            F.col("id"),
            F.col("displayName"),
            F.col("accountEnabled"),
            F.col("servicePrincipalType"),
            F.col("tenantId"),
            F.col("organizationId")
        )
    )
    
    # ============================================================
    # EDGE PREPARATION
    # ============================================================
    
    # Edge: EntraGroup --Contains--> EntraUser
    edge_group_contains_user = (
        df_members.df
        .filter(F.col("targetType") == "user")
        .select(
            F.col("sourceId").alias("SourceGroupId"),
            F.col("targetId").alias("TargetUserId")
        )
        .distinct()
        .withColumn("EdgeKey", F.concat_ws("_", F.col("SourceGroupId"), F.col("TargetUserId")))
    )
    
    # Edge: EntraGroup --Contains--> EntraGroup (nested groups)
    edge_group_contains_group = (
        df_members.df
        .filter(F.col("targetType") == "group")
        .select(
            F.col("sourceId").alias("SourceGroupId"),
            F.col("targetId").alias("TargetGroupId")
        )
        .distinct()
        .withColumn("EdgeKey", F.concat_ws("_", F.col("SourceGroupId"), F.col("TargetGroupId")))
    )
    
    # Edge: EntraGroup --Contains--> EntraServicePrincipal
    edge_group_contains_sp = (
        df_members.df
        .filter(F.col("targetType") == "servicePrincipal")
        .select(
            F.col("sourceId").alias("SourceGroupId"),
            F.col("targetId").alias("TargetSPId")
        )
        .distinct()
        .withColumn("EdgeKey", F.concat_ws("_", F.col("SourceGroupId"), F.col("TargetSPId")))
    )
    
  3. Definer grafskjemaet og bind til DataFrames som ble opprettet i forrige trinn

    from sentinel_graph import GraphSpecBuilder, Graph
    
    # Define the graph schema 
    
    entra_group_graph_spec = (
        GraphSpecBuilder.start()
    
        # === NODES ===
    
        .add_node("EntraUser")
        .from_dataframe(user_nodes)  # Native Spark DataFrame (from .df + .select + .distinct)
        .with_columns(
            "id", "displayName", "accountEnabled",
            "department", "lastPasswordChangeDateTime", "userPrincipalName", "usageLocation",
            key="id", display="displayName"
        )
    
        .add_node("EntraGroup")
        .from_dataframe(group_nodes)  # Native Spark DataFrame
        .with_columns(
            "id", "displayName", "mailEnabled",
            key="id", display="displayName"
        )
    
        .add_node("EntraServicePrincipal")
        .from_dataframe(sp_nodes)  # Native Spark DataFrame
        .with_columns(
            "id", "displayName", "accountEnabled",
            "servicePrincipalType", "tenantId", "organizationId",
            key="id", display="displayName"
        )
    
        # === EDGES ===
    
        # EntraGroup --ContainsUser--> EntraUser
        .add_edge("ContainsUser")
        .from_dataframe(edge_group_contains_user)  # Native Spark DataFrame
        .source(id_column="SourceGroupId", node_type="EntraGroup")
        .target(id_column="TargetUserId", node_type="EntraUser")
        .with_columns("SourceGroupId", "TargetUserId", "EdgeKey",
                      key="EdgeKey", display="EdgeKey")
    
        # EntraGroup --ContainsGroup--> EntraGroup (nested groups)
        .add_edge("ContainsGroup")
        .from_dataframe(edge_group_contains_group)  # Native Spark DataFrame
        .source(id_column="SourceGroupId", node_type="EntraGroup")
        .target(id_column="TargetGroupId", node_type="EntraGroup")
        .with_columns("SourceGroupId", "TargetGroupId", "EdgeKey",
                      key="EdgeKey", display="EdgeKey")
    
        # EntraGroup --ContainsServicePrincipal--> EntraServicePrincipal
        .add_edge("ContainsServicePrincipal")
        .from_dataframe(edge_group_contains_sp)  # Native Spark DataFrame
        .source(id_column="SourceGroupId", node_type="EntraGroup")
        .target(id_column="TargetSPId", node_type="EntraServicePrincipal")
        .with_columns("SourceGroupId", "TargetSPId", "EdgeKey",
                      key="EdgeKey", display="EdgeKey")
    
    ).done()
    
  4. Validere grafskjemaet

    # Check the schema of the graph spec to ensure it's correct
    entra_group_graph_spec.show_schema()
    
  5. Bygg grafen, inkludert klargjøring av dataene og publisering av grafen

    # Build the graph from the spec - this will validate the spec and prepare it for querying
    # Alter options is to use Graph.prepare() to prepare the graph nodes and edges in the lake
    # and then use Graph.publish() to create the graph. You would typically call prepare() and publish()
    # seperately to understand the cost of Graph API calls that are triggeterd by Graph.publish()
    # see https://learn.microsoft.com/azure/sentinel/billing?tabs=simplified%2Ccommitment-tiers
    intra_group_graph = Graph.build(entra_group_graph_spec)
    

    Obs!

    Grafer som opprettes under interaktive notatblokkøkter, fjernes når notatblokkøkten lukkes. Hvis du vil beholde grafen for gjenbruk og deling, kan du se Persist den egendefinerte grafen

Du har nå opprettet en graf i notatblokken.

Hvis du vil vise en visuell fremstilling av grafen, limer du inn og kjører følgende kode i en ny celle:

# Query 1: Find nested group relationships nexting up to 8 levels deep
# Update the Entra Group name that you want to traverse from
query_nested_groups = """
MATCH p=(g1:EntraGroup)-[cg]->{1,8}(g2)
WHERE g1.displayName = 'tmplevel3'
RETURN *
"""
intra_group_graph.query(query_nested_groups).show()

Denne koden kjører en eksempelspørringsspørring (GQL) for å hente alle nestede gruppemedlemskap opptil åtte nivåer dypt Den resulterende grafen visualiseres i utdataene

Et skjermbilde som viser visualiseringen av en graf i Visual Studio Code.

Fasthold den egendefinerte grafen

Når du har opprettet grafkoden i notatblokken, kan du kjøre notatblokken i en interaktiv økt eller planlegge en grafjobb. Grafer som opprettes under den interaktive notatblokkøkten, er midlertidige og er bare tilgjengelige i konteksten til notatblokkøkten. Hvis du vil lagre grafen og dele den med teamet, kan du planlegge en grafjobb for å gjenoppbygge grafen ofte. Når grafen er lagret, er den tilgjengelig fra: grafopplevelsen i Microsoft Defender portalen under Sentinel, Visual Studio Code notatblokker og Graph-spørrings-API-er.

  1. Velg Opprett planlagt jobb fra grafnotatblokken, og velg deretter Opprett en grafjobb.

    Et skjermbilde som viser knappen Opprett planlagt jobb i en grafnotatblokk.

  2. Skriv inn graph-navnet og -beskrivelsen i jobbskjemaet Opprett graf, og kontroller at riktig grafnotatblokk er inkludert i Bane.

  3. Hvis du vil bygge grafen uten å konfigurere en oppdateringsplan, velger du Ved behov i Tidsplan-delen , og deretter velger du Send for å opprette grafen.

    Obs!

    Grafer som opprettes ved bruk av behovsplan, har standardoppbevaring på 30 dager og slettes ved utløp.

  4. Hvis du vil bygge grafen der grafdataene oppdateres regelmessig, velger du Planlagt i Tidsplan-delen .

    1. Velg en gjentakelsesfrekvens for jobben. Du kan velge blant minutt,time,ukentlig, daglig eller månedlig.

    2. Flere alternativer vises for å konfigurere tidsplanen, avhengig av hyppigheten du velger. For eksempel dag i uken, klokkeslettet eller dagen i måneden.

    3. Velg en Start i tide for at tidsplanen skal begynne å kjøre.

    4. Velg et sluttidspunkt for at tidsplanen skal slutte å kjøre. Hvis du ikke vil angi et sluttidspunkt for tidsplanen, velger du Angi at jobben skal kjøres i det uendelige. Datoer og klokkeslett er i tidssonen.

    5. Velg Send for å lagre jobbkonfigurasjonen og publisere jobben. Diagrambyggingsprosessen starter i leieren. Vis den nyopprettede grafen og den nyeste statusen i Sentinel filtypen.

    Et skjermbilde av siden Opprett grafjobb.

Vise og behandle egendefinerte grafer

Når du har opprettet en grafjobb, kan du vise og administrere grafen i leieren fra Microsoft Sentinel-utvidelsen i Visual Studio Code.

  1. Velg den materialiserte grafen fra listen over grafer for å vise detaljene.

  2. Velg Jobbdetaljer-fanen for å vise statusen for grafjobben, inkludert siste kjøretid, neste kjøretid og eventuelle feil som oppstod under byggeprosessen.

  3. Velg Kjør nå for å manuelt utløse et diagrambygg utenfor de planlagte tidspunktene. Statusen endres til , deretter Pågår mens grafen bygges.

    Et skjermbilde som viser fanen med diagramjobbdetaljer i Visual Studio Code.

  4. Når grafbygget er fullført, oppdateres Status til Ready. Velg Diagramdetaljer-fanen for å vise informasjon om grafen.

    Et skjermbilde av fanen med diagramdetaljer.

  5. Nå kan du spørre og visualisere grafen fra grafvisualiseringen i Microsoft Sentinel i Defender-portalen. Hvis du vil ha mer informasjon, kan du se Visualiser grafer i Microsoft Sentinel graf (forhåndsvisning).