Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
I den här självstudien skapar du en Microsoft Fabric notebook som extraherar data från flera semantiska Power BI-modeller med hjälp av REST-API:et Execute DAX Queries. Du deserialiserar Arrow IPC-svar i Pandas DataFrames, jämför och kombinerar modellutdata och sammanfogar resultat stegvis till en Delta-tabell i OneLake.
Det här mönstret är utformat för dataforskare och analystekniker som behöver extrahering med högt dataflöde med låg parsning.
Varför det här mönstret fungerar
Jämfört med JSON-baserad extrahering minskar Arrow IPC processor- och minneskostnaderna på klientsidan eftersom du undviker upprepad JSON-parsning och objektmaterialisering. Du kan läsa Arrow-buffertar direkt till en tabellrepresentation i minnet och konvertera till Pandas med färre steg för transformering.
När du bevarar resultatuppsättningar stegvis till Delta undviker du även fullständiga tabellomskrivningar. Den här metoden hjälper till att minska användningen av kapacitetsenheter (CU) samtidigt som nedströms Direct Lake-scenarier hålls aktuella.
Det här skapar du
I en Fabric-anteckningsbok:
- Anropa två semantiska modeller med DAX.
- Materialisera varje svar som en Pandas DataFrame.
- Jämför eller kombinera DataFrames.
- Sammanfoga ändringar inkrementellt i en Delta-tabell.
- Kontrollera att Direct Lake-konsumenter kan hämta uppdaterade data.
Förutsättningar
En arbetsyta för Fabric eller Premium-kapacitet.
Minst två semantiska modeller som du vill jämföra eller kombinera.
Skapa och läsa behörigheter för varje semantisk modell.
En Fabric-notebook som är kopplad till ett lakehouse där du kan skapa och uppdatera Delta-tabeller.
Python-paket:
%pip install msal requests pyarrow pandasKlientinställningar aktiverade:
- REST-API för utförande av datamängdsfrågor.
- Tillåt tjänsthuvudmän att använda Power BI API om du endast använder appautentisering.
Fabric notebook-flöde
Notebooken utför följande steg:
- Hämta en åtkomsttoken.
- Kör DAX mot flera semantiska modeller.
- Deserialisera Arrow-svar till Pandas DataFrames.
- Normalisera scheman och jämför eller kombinera DataFrames.
- Sammanfoga resultat inkrementellt till en Delta-tabell.
- Verifiera datatillgänglighet för Direct Lake-förbrukning.
1 – Hämta en Entra ID-token för den aktuella användaren
I den första kodcellen definierar du semantiska modellmål och hämtar en token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 – Köra DAX-frågor mellan semantiska modeller
Definiera en hjälp som kör DAX och returnerar en Pandas DataFrame från Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
I nästa kodcell kör du en modellspecifik DAX-fråga för varje modell och tagg proveniens:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 – Jämföra och kombinera DataFrames
Normalisera nyckelkolumner och jämför sedan modellutdata eller kombinera dem till en enda analysuppsättning.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 – Stegvis sammanslagning till en Delta-tabell
Använd en Delta-sammanslagning som är kopplad till affärskolumner. Det här mönstret uppdaterar ändrade rader och infogar nya rader utan att skriva om den fullständiga tabellen.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tips/Råd
För mycket stora extraheringsfönster partitioneras Delta-måltabellen efter datum och process i avgränsade sektorer. Den här metoden förbättrar sammanslagningseffektiviteten och hjälper till att kontrollera CU-användningen.
5 – Verifiera Direct Lake-beredskap
Bekräfta att Delta-tabellen är uppdaterad och frågebar:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
När Delta-tabellen har uppdaterats kan Direct Lake-semantiska modeller som refererar till tabellen hämta nya data genom normalt synkroniseringsbeteende.
Föreslagen layout för Fabric notebookcell
Använd den här celllayouten för att hålla arbetsflödet underhållsbart:
- Markdown-cell: scenario, modell-ID och tabellmål.
- Python cell: paketimporter och tokenförvärv.
- Python cell: hjälp för DAX-exekvering.
- Python cell: extrahera data från varje semantisk modell.
- Python cell: jämföra/kombinera Pandas DataFrames.
- Python cell: skriv staging-DataFrame till Spark och kör Delta
MERGE. - Python cell: verifiera radantal och senaste tidsstämplar för extrahering.
Prestandavägledning
- Behåll DAX-omfånget endast till obligatoriska kolumner och rader.
- Använd
resultsetRowcountLimitoch DAX-filter för bundna extraheringsfönster. - Prioritera inkrementella hopslagningar framför fullständiga uppdateringar.
- Återanvänd en enskild MSAL-klient och tokencache per notebook-session.
- Välj Pil från slutpunkt till slutpunkt för extrahering för att undvika JSON-parsningskostnader i Python.
- Spåra varaktighet för extrahering, nyttolaststorlek och sammanslagningsvaraktighet som driftmått.
Troubleshooting
- 401 Obehörig: Verifiera tenant, klientautentiseringsuppgifter och åtkomstomfattning.
- HTTP 429: Lägg till återförsök med exponentiell fördröjning och jitter.
- Schemaavvikelse mellan modeller: Normalisera kolumnnamn och datatyper före sammanslagning.
- Stor minnesanvändning i Pandas: Processmodellen matas ut i batchar eller aggregeras i DAX före extrahering.
Anmärkning
Om anroparen inte har tillräckliga behörigheter misslyckas frågan, men HTTP-svaret är fortfarande 200 OK. Granska svarstexten för felspecifikationer.