Condividi tramite


Esercitazione: Creare una pipeline geospaziale con tipi spaziali nativi

Scopri come creare e distribuire una pipeline che inserisce dati GPS, converte le coordinate in tipi spaziali nativi ed esegue join con le geofence del magazzino per tenere traccia degli arrivi utilizzando Lakeflow Spark Declarative Pipelines (SDP) per l'orchestrazione dei dati e Auto Loader. Questa esercitazione usa i tipi spaziali nativi di Databricks (GEOMETRY, GEOGRAPHY) e le funzioni spaziali predefinite, ad ST_Pointesempio , ST_GeomFromWKTe ST_Contains, in modo da poter eseguire flussi di lavoro geospaziali su larga scala senza librerie esterne.

In questa esercitazione si eseguiranno le seguenti attività:

  • Creare una pipeline e generare dati di esempio GPS e geofencing in un volume di Unity Catalog.
  • Inserire ping GPS non elaborati in modo incrementale con Auto Loader in una tabella di streaming bronze.
  • Creare una tabella silver streaming che converte la latitudine e la longitudine in un punto nativo GEOMETRY.
  • Creare una vista materializzata dei recinti virtuali del magazzino dai poligoni WKT.
  • Eseguire un join spaziale per produrre una tabella degli arrivi al magazzino (quale dispositivo è entrato in quale recinto virtuale).

Il risultato è una pipeline in stile medaglione: bronzo (GPS grezzo), argento (punti come geometria) e oro (georecinzioni e eventi di arrivo). Per ulteriori informazioni, vedere Che cos'è l'architettura del lakehouse medallion?

Requisiti

Per completare questa esercitazione, è necessario soddisfare i requisiti seguenti:

Passaggio 1: creare una pipeline

Creare una nuova pipeline ETL e impostare il catalogo e lo schema predefiniti per le tabelle.

  1. Nell'area di lavoro fare clic sull'icona Più. Novità nell'angolo superiore sinistro.

  2. Fare clic su Pipeline ETL.

  3. Modificare il titolo della pipeline in Spatial pipeline tutorial o in un nome a tua scelta.

  4. Sotto il titolo scegliere un catalogo e uno schema per cui si dispone delle autorizzazioni di scrittura.

    Questo catalogo e schema vengono usati per impostazione predefinita quando non si specifica un catalogo o uno schema nel codice. Sostituire <catalog> e <schema> nei passaggi seguenti con i valori scelti qui.

  5. In Opzioni avanzate selezionare Inizia con un file vuoto.

  6. Scegliere una cartella per il codice. È possibile selezionare Sfoglia per scegliere una cartella; è possibile usare una cartella Git per il controllo della versione.

  7. Scegliere Python o SQL per il linguaggio del primo file. È possibile aggiungere file nell'altra lingua in un secondo momento.

  8. Fare clic su Seleziona per creare la pipeline e aprire l'Editor pipeline di Lakeflow.

È ora disponibile una pipeline vuota con un catalogo e uno schema predefiniti. Creare quindi i dati GPS e geofence di esempio.

Passaggio 2: Creare i dati di esempio per GPS e georecinto

Questo passaggio genera dati di esempio in un volume: ping GPS non elaborati (JSON) e georecinzioni del magazzino (JSON con poligoni WKT). I punti GPS vengono generati in una scatola di delimitazione che si sovrappone ai due poligoni dei magazzini, quindi il join spaziale in un passaggio successivo restituirà le righe relative all'arrivo. È possibile ignorare questo passaggio se si dispone già di dati personalizzati in un volume o in una tabella.

  1. Nel browser asset dell'editor di Lakeflow Pipelines fare clic sull'icona Più.Aggiungi, quindi Esplora.

  2. Impostare Nome su Setup spatial data, scegliere Python e lasciare la cartella di destinazione predefinita.

  3. Clicca su Crea.

  4. Nel nuovo notebook incollare il codice seguente. Sostituire <catalog> e <schema> con il catalogo e lo schema predefiniti impostati nel passaggio 1.

    Usare il codice seguente nel notebook per generare dati GPS e geofence.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Esegui la cella del notebook (Maiusc + Invio).

Al termine dell'esecuzione, il volume contiene gps (ping non elaborati) e geofences (poligoni in WKT). Nel passaggio successivo si inseriscono i dati GPS in una tabella bronze.

Passaggio 3: Trasferire i dati GPS in una tabella di streaming di livello bronze

Inserire il codice JSON GPS non elaborato dal volume in modo incrementale usando Auto Loader e scrivere in una tabella di streaming bronze.

  1. Nel browser delle risorse premi sull'icona Più.Aggiungi, quindi Trasformazione.

  2. Impostare Nome su gps_bronze, scegliere SQL o Python e fare clic su Crea.

  3. Sostituire il contenuto del file con il codice seguente (usare la scheda corrispondente alla lingua in uso). Sostituire <catalog> e <schema> con il catalogo e lo schema predefiniti.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Fare clic sull'icona Riproduci.Eseguire il file o eseguire la pipeline per eseguire un aggiornamento.

Al termine dell'aggiornamento, il grafico della pipeline mostra la tabella gps_bronze. Successivamente, aggiungere una tabella silver che converte le coordinate in un punto di geometria nativa.

Passaggio 4: Aggiungere una tabella di streaming silver con punti geometrici

Creare una tabella di streaming che legge dalla tabella bronze e aggiunge una GEOMETRY colonna usando ST_Point(longitude, latitude).

  1. Nel browser delle risorse, fare clic sull'icona del Plus.Aggiungi, quindi Trasformazione.

  2. Impostare Nome su raw_gps_silver, scegliere SQL o Python e fare clic su Crea.

  3. Incollare il codice seguente nel nuovo file.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Fare clic sull'icona Play;Esegui il file o Esegui la pipeline.

Il grafico della pipeline ora mostra gps_bronze e raw_gps_silver. Successivamente, aggiungere i recinti virtuali del magazzino come vista materializzata.

Passaggio 5: Creare la tabella gold dei recinti virtuali del magazzino

Creare una vista materializzata che legge i georecinti dal volume e converte la colonna WKT in colonna GEOMETRY usando ST_GeomFromWKT.

  1. Nel browser delle risorse, fare clic sull'icona del Plus.Aggiungi, quindi Trasformazione.

  2. Impostare Nome su warehouse_geofences_gold, scegliere SQL o Python e fare clic su Crea.

  3. Incollare il codice seguente. Sostituire <catalog> e <schema> con il catalogo e lo schema predefiniti.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Fare clic sull'icona Play;Esegui il file o Esegui la pipeline.

La pipeline include ora la tabella recinti virtuali. Quindi, aggiungi il join spaziale per calcolare gli arrivi al magazzino.

Passaggio 6: Creare la tabella degli arrivi del magazzino con un join spaziale

Aggiungere una vista materializzata che colleghi i punti GPS silver alle geofence utilizzando ST_Contains(boundary_geom, point_geom) per determinare quando un dispositivo si trova all'interno di un poligono del magazzino.

  1. Nel browser delle risorse, fare clic sull'icona del Plus.Aggiungi, quindi Trasformazione.

  2. Impostare Nome su warehouse_arrivals, scegliere SQL o Python e fare clic su Crea.

  3. Incollare il codice seguente.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Fare clic sull'icona Play;Esegui il file o Esegui la pipeline.

Al termine dell'aggiornamento, il grafico della pipeline mostra tutti e quattro i set di dati: gps_bronze, raw_gps_silverwarehouse_geofences_gold, e warehouse_arrivals.

Verificare l'unione spaziale

Verificare che il join spaziale produca righe: i punti della tabella silver che rientrano in un geofence vengono visualizzati in warehouse_arrivals. Eseguire una delle operazioni seguenti in un notebook o in un editor SQL (usare lo stesso catalogo e schema della destinazione della pipeline).

Conteggio degli arrivi per magazzino (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Dovrebbero essere visualizzati conteggi diversi da zero per Warehouse_A e Warehouse_B (i dati GPS di esempio si sovrappongono a entrambi i poligoni). Per esaminare le righe di esempio:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Stessi controlli in Python (notebook):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Se vengono visualizzate righe in warehouse_arrivals, il ST_Contains(boundary_geom, point_geom) join funziona correttamente.

Passaggio 7: Pianificare la pipeline (facoltativo)

Per mantenere aggiornata la pipeline man mano che i nuovi dati GPS arrivano nel volume, creare un processo per eseguire la pipeline in base a una pianificazione.

  1. Nella parte superiore dell'editor scegliere il pulsante Pianifica .
  2. Se viene visualizzata la finestra di dialogo Pianificazioni , scegliere Aggiungi pianificazione.
  3. Facoltativamente, dare un nome al job.
  4. Per impostazione predefinita, la pianificazione viene eseguita una volta al giorno. È possibile accettarlo o impostarne uno personalizzato. Scelta avanzata consente di impostare un orario specifico; Altre opzioni consentono di aggiungere notifiche di esecuzione.
  5. Selezionare Crea per applicare la pianificazione.

Vedere Monitoraggio e osservabilità per i processi Lakeflow per ulteriori informazioni sulle esecuzioni dei processi.

Risorse aggiuntive