Data som strömmas till ett sjöhus med Spark

Strukturerad strömning är en skalbar, feltolerant strömbearbetningsmotor som bygger på Spark. Den behandlar en livedataström som en tabell som nya rader läggs till kontinuerligt i. Strukturerad direktuppspelning stöder inbyggda filkällor som CSV, JSON, ORC och Parquet, tillsammans med meddelandetjänster som Kafka och Azure Event Hubs.

Den här artikeln beskriver hur du konfigurerar en strömmande källa, till exempel Azure Event Hubs, matar in strömmande data i en Lakehouse Delta-tabell, optimerar skrivprestanda med partitionering och händelsebatchbearbetning och kör strömningsjobb på ett tillförlitligt sätt i produktionen.

Konfigurera en strömmande källa

Om du vill strömma data till ett lakehouse konfigurerar du först en anslutning till din strömningskälla. Azure Event Hubs är ett vanligt val. Använd Azure Event Hubs Connector för Apache Spark för att ansluta ditt Spark-program till Azure Event Hubs.

En grundläggande Event Hubs-konfiguration kräver Namnområdesnamn för Event Hubs, hubbnamn, nyckelnamn för delad åtkomst och konsumentgrupp.

En konsumentgrupp är en vy över en hel händelsehubb. Konsumentgrupper möjliggör för flera konsumerande applikationer att var och en ha en separat vy över händelseströmmen och att läsa strömmen i sin egen takt med sina egna offset-positioner.

Med partitioner i Event Hubs kan du bearbeta stora mängder händelser parallellt. En enskild processor har en begränsad kapacitet för att hantera händelser per sekund, medan flera processorer kan fungera parallellt mellan partitioner.

Om för många partitioner används med låg inmatningshastighet hanterar partitionsläsare en liten del av data, vilket orsakar icke-optimal bearbetning. Det ideala antalet partitioner beror på önskad bearbetningshastighet. När du ökar antalet dataflödesenheter i namnområdet kanske du vill ha extra partitioner så att samtidiga läsare kan uppnå sitt maximala dataflöde.

Testa det bästa antalet partitioner för ditt dataflödesscenario. Scenarier med högt dataflöde använder vanligtvis 32 eller fler partitioner.

Deltatabell som en direktuppspelningsmottagare

Delta Lake är ett lagringslager med öppen källkod som möjliggör ACID-transaktioner (atomicitet, konsekvens, isolering och hållbarhet) på toppen av data lake-lagring. I Fabric Data Engineering stöder Delta Lake upserts, datakomprimering, tidsresa, schemautveckling och lagring i öppet format.

Med delta som utdataformat i writeStreamflödar strömmande data direkt till en Delta-tabell. Följande exempel läser från Event Hubs, parsar meddelandetexten och skriver till en Delta-tabell:

import pyspark.sql.functions as f
from pyspark.sql.types import *

df = (
    spark.readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

Schema = StructType([
    StructField("<column_name_01>", StringType(), False),
    StructField("<column_name_02>", StringType(), False),
    StructField("<column_name_03>", DoubleType(), True),
    StructField("<column_name_04>", LongType(), True),
    StructField("<column_name_05>", LongType(), True),
])

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .toTable("deltaeventstable")
)

I koden format("delta") anger du Delta som utdataformat, outputMode("append") skriver bara nya rader till tabellen och toTable("deltaeventstable") bevarar strömmade data till en hanterad Delta-tabell.

Optimera strömningsprestanda

När grundläggande strömningsinmatning fungerar kan du förbättra dataflödet och filorganisationen med optimeringsteknikerna i följande avsnitt.

Partitioneringsdata för skrivningar

Partitionera dina data effektivt för att optimera dataflödet. Partitionering förbättrar både skrivdataflöde och nedströms frågeprestanda. Du kan partitionera data i minnet, på disken eller både och.

På disk – Används partitionBy() för att organisera data i underkataloger baserat på kolumnvärden. Välj kolumner med bra kardinalitet som ger optimal storlek på filer. Undvik kolumner som skapar för många små partitioner eller för få stora.

I minnet – Använd repartition() eller coalesce() för att distribuera data mellan arbetsnoder innan du skriver:

  • repartition() ökar eller minskar partitioner med en komplett omfördelning och balanserar data jämnt.
  • coalesce() minskar bara partitioner, vilket minimerar dataförflyttningen.

Att kombinera båda metoderna fungerar bra för scenarier med högt dataflöde. I följande exempel delas data upp i 48 partitioner i minnet (matchning av tillgängliga CPU-kärnor) och partitioner på disk med två kolumner:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .repartition(48)
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .toTable("deltaeventstable")
)

Använda optimerad skrivning

Som ett alternativ till manuell partitionering sammanfogar eller delar optimerad skrivning partitioner innan du skriver, vilket maximerar diskdataflödet utan manuella repartition() eller coalesce() anrop. Aktivera den med en Spark-konfiguration:

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)

Med Optimerad skrivning aktiverat kan du ta bort repartition() eller coalesce() från koden och låta Spark hantera partitionsstorlek. Du kan fortfarande använda partitionBy() för organisationen på disknivå.

Batchhändelser med utlösare

För att ytterligare optimera skrivprestanda, batchhändelser innan du skriver dem till disk. Som standard bearbetar Spark varje mikrobatch så snart den föregående har slutförts. Om du anger ett triggerintervall ackumuleras data under en tidsperiod och skrivs i färre, större operationer. Större batchar ger större Delta-filer och minskar kostnaderna för små filer.

I följande exempel bearbetas händelser i intervall på en minut:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .trigger(processingTime="1 minute")
    .toTable("deltaeventstable")
)

Analysera volymen av inkommande data och välj ett bearbetningsintervall som producerar välstora Parquet-filer i Delta-tabellen.

Köra direktuppspelningsjobb i produktion

Spark Notebooks är ett effektivt verktyg för att utveckla och testa strömningslogik. För produktionsarbetsbelastningar som behöver köras kontinuerligt använder du dock Spark-jobbdefinitioner i stället. Spark-jobbdefinitioner är icke-interaktiva, kodorienterade uppgifter som körs på ett Spark-kluster och ger större robusthet och tillgänglighet.

Infrastrukturen som kör ett direktuppspelningsjobb kan stöta på problem som stoppar jobbet, till exempel maskinvarufel eller infrastrukturkorrigering. En återförsöksprincip startar automatiskt om jobbet när det oväntat stoppas. Konfigurera återförsöksprincipen för en Spark-jobbdefinition för att ange hur många gånger jobbet ska startas om (upp till oändliga återförsök) och tidsintervallet mellan återförsök. När en återförsöksprincip är aktiverad fortsätter strömningsjobbet att köras tills du uttryckligen stoppar det.

Övervakningshubben för infrastrukturresurser innehåller en flik för strukturerad direktuppspelning med mått som indatafrekvens, processfrekvens, indatarader, batchvaraktighet och åtgärdsvaraktighet.