Utiliser le connecteur d’ingestion Zerobus

Cette page explique comment ingérer des données à l’aide du connecteur d’ingestion Zerobus dans Lakeflow Connect.

Choisir une interface

Zerobus Ingest supporte les interfaces gRPC, REST et OpenTelemetry (OTLP). Les kits SDK fournissent des clients gRPC personnalisés avec une interface conviviale pour les développeurs pour la création d’applications à haut débit. L’interface REST gère les contraintes architecturales lors du traitement de flottes massives d’appareils « bavards ». L’interface OTLP accepte les données OpenTelemetry standard sans nécessiter de bibliothèques personnalisées.

  • Kits de développement logiciel (SDK) avec gRPC « Taxe de connexion » : gRPC se spécialise dans les performances à haut débit via des connexions persistantes. Toutefois, chaque flux ouvert compte par rapport à vos quotas d’accès concurrentiel.
  • La « taxe sur le débit » REST : REST nécessite une négociation complète pour chaque mise à jour, ce qui le rend sans état. REST s’aligne correctement sur les cas d’utilisation des appareils de périphérie où l’état est signalé rarement.
  • OpenTelemetry (OTLP) : Si vous utilisez déjà les SDK OpenTelemetry ou des collecteurs, le serveur OTLP récupère des traces, des journaux et des métriques dans des tables Delta du catalogue Unity sans besoin d'une intégration personnalisée. Pour plus d’informations, consultez Ingest OpenTelemetry data with Zerobus Ingest.

Appuyez sur gRPC pour les flux à volume élevé, REST pour les flottes massives d’appareils à faible fréquence et OTLP pour les environnements déjà instrumentés avec OpenTelemetry.

Obtenez l’URL de votre espace de travail et le point de terminaison d’ingestion de Zerobus

L’URL de votre espace de travail s’affiche dans le navigateur lorsque vous vous connectez. Alors que l’URL complète suit le format https://<databricks-instance>.net/o=XXXXX, l’URL de l’espace de travail se compose de tout avant le /o=XXXXX. Par exemple, étant donné l’URL complète suivante, vous pouvez déterminer l’URL de l’espace de travail et l’ID d’espace de travail.

  • URL complète : https://abcd-teste2-test-spcse2.azuredatabricks.net/?o=2281745829657864#
  • URL de l’espace de travail : https://abcd-teste2-test-spcse2.azuredatabricks.net
  • ID de l’espace de travail : 2281745829657864

Le point de terminaison du serveur dépend de l’espace de travail et de la région :

  • Point de terminaison du serveur : <workspace-id>.zerobus.<region>.azuredatabricks.net

Pour rechercher votre région d’espace de travail, ouvrez le sélecteur d’espace de travail dans la barre de navigation supérieure de l’interface utilisateur Databricks . La région est affichée sous chaque nom d’espace de travail (par exemple). eastus Vous pouvez également le trouver dans la console de compte sous Espaces de travail.

Pour connaître la disponibilité des régions, consultez les limitations du connecteur d’ingestion Zerobus.

Créer ou identifier la table cible

Identifiez la table cible dans laquelle vous souhaitez ingérer des données. Pour créer une table cible, exécutez la CREATE TABLE commande SQL. Par exemple, créez une table nommée unity.default.air_quality.

    CREATE TABLE unity.default.air_quality (
    device_name STRING, temp INT, humidity LONG);

Note

Pour l'ingestion d'OpenTelemetry, les tables doivent utiliser des schémas prédéfinis pour chaque type de signal (traces, logs, métriques). Consultez Créer des tables cibles dans le catalogue Unity.

Créer un principal de service et accorder des autorisations

Un principal de service est une identité spécialisée qui fournit plus de sécurité que des comptes personnalisés. Pour plus d’informations sur les principaux de services et leur utilisation pour l’authentification, consultez Autoriser l'accès au principal de service pour Azure Databricks avec OAuth.

  1. Pour créer un principal de service, accédez à Paramètres>Identité et Accès.

  2. Sous Principaux de service, sélectionnez Gérer.

  3. Cliquez sur Ajouter un principal de service.

  4. Dans la fenêtre Ajouter un principal de service , créez un principal de service en cliquant sur Ajouter nouveau.

  5. Générez et enregistrez l’ID client et la clé secrète client pour le principal de service.

  6. Accordez les autorisations requises pour le catalogue, le schéma et la table au principal du service.

    1. Dans la page principal du service , accédez à l’onglet Configurations .
    2. Copiez l’ID d’application (UUID).
    3. Utilisez le code SQL suivant pour accorder des autorisations, en remplaçant l’exemple UUID et le catalogue, le nom du schéma et les noms de tables si nécessaire.
    GRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
    

    Important

    Vous devez accorder les privilèges MODIFY et SELECT sur la table, même pour les tables avec ALL PRIVILEGES déjà attribués.

Écrire un client

Utilisez un Kit de développement logiciel (SDK) Zerobus dans votre langage de programmation préféré ou l’API REST pour ingérer des données dans votre table cible.

SDK Python

Python 3,9 ou version ultérieure est nécessaire. Le kit de développement logiciel (SDK) utilise des liaisons PyO3 vers le SDK Rust haute performance, offrant jusqu'à 40 fois plus de débit que Python pur et un I/O réseau efficace grâce au runtime asynchrone de Rust. Il prend en charge JSON (le plus simple) et Protocol Buffers (recommandé pour la production). Le SDK prend également en charge les implémentations de synchronisation et asynchrone, ainsi que 3 méthodes d’ingestion différentes (futures, basées sur offset et fire-and-forget).

pip install databricks-zerobus-ingest-sdk

Exemple JSON :

import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.eastus.azuredatabricks.net"
DATABRICKS_WORKSPACE_URL="https://adb-1234567890123456.12.azuredatabricks.net"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
    SERVER_ENDPOINT,
    DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
    for i in range(1000):
        record_dict = {
            "device_name": f"sensor-{i}",
            "temp": 20 + i % 15,
            "humidity": 50 + i % 40
        }
        offset = stream.ingest_record_offset(record_dict)

        # Optional: Wait for durability confirmation
        stream.wait_for_offset(offset)
finally:
    stream.close()

Rappel d’accusé de réception : Pour suivre asynchrone la progression de l’ingestion, utilisez l’option ack_callback . Pour créer une sous-classe de AckCallback avec les méthodes on_ack(offset: int) et on_error(offset: int, error_message: str), qui sont appelées lorsque les enregistrements sont reconnus ou échouent.

from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType

class MyAckCallback(AckCallback):
    def on_ack(self, offset: int) -> None:
        print(f"Record acknowledged at offset: {offset}")

    def on_error(self, offset: int, error_message: str) -> None:
        print(f"Error at offset {offset}: {error_message}")

options = StreamConfigurationOptions(
    record_type = RecordType.JSON,
    ack_callback = MyAckCallback()
)

Mémoires tampons de protocole : Pour l’ingestion de type sécurisé, utilisez les mémoires tampons de protocole avec RecordType.PROTO (par défaut) et fournissez une descriptorProto valeur dans les propriétés de table.

Pour obtenir la documentation complète, les options de configuration, l’ingestion par lots et les exemples de mémoire tampon de protocole, consultez le référentiel Python sdk.

Rust SDK

Rust 1.70 ou version ultérieure est nécessaire. Le SDK tire parti d’E/S asynchrones et de gRPC pour l’ingestion à haut débit et sert de cœur de tous les autres kits SDK. Il prend en charge JSON (le plus simple) et Protocol Buffers (recommandé pour la production).

Tout d’abord, importez le package.

cargo add databricks-zerobus-ingest-sdk

Ou ajoutez-le à votre Cargo.toml.

[dependencies]
databricks-zerobus-ingest-sdk = "0.5.0" # Latest version at time of publication

Exemple JSON :

  use databricks_zerobus_ingest_sdk::{
      databricks::zerobus::RecordType, JsonString, StreamConfigurationOptions,
      TableProperties, ZerobusSdk,
  };
  use std::error::Error;

  // See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
  const DATABRICKS_WORKSPACE_URL: &str = "https://adb-1234567890123456.12.azuredatabricks.net";
  const SERVER_ENDPOINT: &str = "1234567890123456.zerobus.eastus.azuredatabricks.net";
  const TABLE_NAME: &str = "main.default.air_quality";
  const CLIENT_ID: &str = "your-client-id";
  const CLIENT_SECRET: &str = "your-client-secret";


  #[tokio::main]
  async fn main() -> Result<(), Box<dyn Error>> {
      let table_properties = TableProperties {
          table_name: TABLE_NAME.to_string(),
          descriptor_proto: None,
      };
      let stream_configuration_options = StreamConfigurationOptions {
          max_inflight_requests: 100,
          record_type: RecordType::Json,
          ..Default::default()
      };
      let sdk_handle = ZerobusSdk::builder()
          .endpoint(SERVER_ENDPOINT)
          .unity_catalog_url(DATABRICKS_WORKSPACE_URL)
          .build()?;

      let mut stream = sdk_handle
          .create_stream(
            table_properties.clone(),
            CLIENT_ID.to_string(),
            CLIENT_SECRET.to_string(),
            Some(stream_configuration_options),
          )
          .await?;

      let offset = stream.ingest_record_offset(
        JsonString("{
          \"device_name\": \"sensor\",
          \"temp\": 22,
          \"humidity\": 55}".to_string())).await?;

      stream.wait_for_offset(offset).await?;
      println!("Record ingested successfully");
      stream.close().await?;
      println!("Stream closed successfully");

      Ok(())
  }

Mémoires tampons de protocole : Pour l’ingestion de type sécurisé, utilisez les mémoires tampons de protocole avec RecordType::Proto (par défaut) et fournissez une descriptor_proto valeur dans les propriétés de table. Générez les fichiers nécessaires à l’aide de l’outil generate_proto et importez dans votre projet. Pour obtenir une documentation complète, des options de configuration, des exemples d’ingestion par lots, generate_proto d’outil et de mémoire tampon de protocole, consultez le référentiel du Kit de développement logiciel (SDK) Rust.

SDK Java

Java 8 ou version ultérieure est nécessaire. Le Kit de développement logiciel (SDK) utilise des liaisons JNI (Java Native Interface) au SDK Rust hautes performances, offrant une latence inférieure à celle du gRPC en Java pur et des E/S réseau efficaces via le runtime asynchrone de Rust. Il prend en charge JSON (le plus simple) et Protocol Buffers (recommandé pour la production).

Maven:

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>zerobus-ingest-sdk</artifactId>
    <version>0.2.0</version>
</dependency>

Exemple JSON :

import com.databricks.zerobus.*;

public class ZerobusClient {

// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
    private static final String SERVER_ENDPOINT =
        "https://1234567890123456.zerobus.eastus.azuredatabricks.net";
    private static final String DATABRICKS_WORKSPACE_URL =
        "https://adb-1234567890123456.12.azuredatabricks.net";
    private static final String TABLE_NAME = "main.default.air_quality";
    private static final String CLIENT_ID = "your-client-id";
    private static final String CLIENT_SECRET = "your-client-secret";

    public static void main(String[] args) throws Exception {
        ZerobusSdk sdk = new ZerobusSdk(
            SERVER_ENDPOINT,
            DATABRICKS_WORKSPACE_URL
        );

        ZerobusJsonStream stream = sdk.createJsonStream(
            TABLE_NAME, CLIENT_ID, CLIENT_SECRET
        ).join();

        try {
            long lastOffset = 0;
            for (int i = 0; i < 100; i++) {
                String record = String.format(
                    "{\"device_name\": \"sensor-%d\", \"temp\": 22, \"humidity\": 55}", i
                );
                lastOffset = stream.ingestRecordOffset(record);
            }
            stream.waitForOffset(lastOffset);
        } finally {
            stream.close();
        }
    }
}

Mémoires tampons de protocole : Pour l’ingestion de type sécurisé, utilisez ZerobusProtoStream avec createProtoStream(). Générez un schéma à partir de votre table à l’aide de l’outil JAR groupé, puis compilez-le avec protoc.

Pour obtenir une documentation complète, les options de configuration, l’ingestion par lots et les exemples de mémoire tampon de protocole, consultez le référentiel Java sdk.

Kit de développement logiciel (SDK) Go

La version 1.21 ou ultérieure est requise. Le SDK enveloppe le SDK Rust haute performance en utilisant CGO et FFI, garantissant le même débit et la même performance. Il prend en charge JSON (le plus simple) et Protocol Buffers (recommandé pour la production).

go get github.com/databricks/zerobus-sdk-go@latest

Exemple JSON :

Par souci de simplicité, les erreurs sont ignorées ici. Dans le code de production, vérifiez toujours les erreurs.

package main

import (
	"fmt"

	zerobus "github.com/databricks/zerobus-sdk-go"
)

// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const (
	ServerEndpoint         = "https://1234567890123456.zerobus.eastus.azuredatabricks.net"
	DatabricksWorkspaceURL = "https://adb-1234567890123456.12.azuredatabricks.net"
	TableName              = "main.default.air_quality"
	ClientID               = "your-client-id"
	ClientSecret           = "your-client-secret"
)

func main() {
	sdk, _ := zerobus.NewZerobusSdk(
		ServerEndpoint,
		DatabricksWorkspaceURL,
	)
	defer sdk.Free()

	options := zerobus.DefaultStreamConfigurationOptions()
	options.RecordType = zerobus.RecordTypeJson

	stream, _ := sdk.CreateStream(
		zerobus.TableProperties{
			TableName: TableName,
		},
		ClientID,
		ClientSecret,
		options,
	)
	defer stream.Close()

	offset, _ := stream.IngestRecordOffset(`{
		"device_name": "sensor-001",
		"temp": 20,
		"humidity": 60
	}`)

	_ = stream.WaitForOffset(offset)
  fmt.Println("Record ingested successfully")

  _ = stream.Close()
  fmt.Println("Stream closed successfully")
}

Mémoires tampons de protocole : Pour l’ingestion de type sécurisé, utilisez les mémoires tampons de protocole avec RecordTypeProto (par défaut) et fournissez une descriptorProto valeur dans les propriétés de table. Créez un fichier .proto correspondant à votre schéma de table et exécutez generate_proto un script pour vous aider à importer les fichiers dans votre projet.

Pour obtenir la documentation complète, les options de configuration, l’ingestion par lots, les exemples d’outil et de mémoire tampon de protocole generate_proto, consultez le référentiel du Kit de développement logiciel (SDK) Go.

Kit de développement logiciel (SDK) TypeScript

Node.js 16 ou version ultérieure est nécessaire. Le SDK encapsule le Rust SDK haute-performance à l’aide des liaisons natives de NAPI-RS, fournissant des performances natives avec des futures Rust mappées à des Promises JavaScript. Il prend en charge JSON (le plus simple) et Protocol Buffers (recommandé pour la production).

npm install @databricks/zerobus-ingest-sdk

Exemple JSON :

import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';

// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const SERVER_ENDPOINT = 'https://1234567890123456.zerobus.eastus.azuredatabricks.net';
const DATABRICKS_WORKSPACE_URL = 'https://adb-1234567890123456.12.azuredatabricks.net';
const TABLE_NAME = 'main.default.air_quality';
const CLIENT_ID = 'your-client-id';
const CLIENT_SECRET = 'your-client-secret';

const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);

const stream = await sdk.createStream({ tableName: TABLE_NAME }, CLIENT_ID, CLIENT_SECRET, {
  recordType: RecordType.Json,
});

try {
  let lastOffset = BigInt(0);
  for (let i = 0; i < 100; i++) {
    const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
    lastOffset = await stream.ingestRecordOffset(record);
  }
  await stream.waitForOffset(lastOffset);
} finally {
  await stream.close();
}

Mémoires tampons de protocole : Pour l’ingestion de type sécurisé, utilisez les mémoires tampons de protocole avec RecordType.Proto (par défaut) et fournissez une descriptorProto valeur dans les propriétés de table.

Pour obtenir une documentation complète, les options de configuration, l’ingestion par lots et les exemples de mémoire tampon de protocole, consultez le référentiel du Kit de développement logiciel (SDK) TypeScript.

API REST

L’API REST vous permet d’ingérer un enregistrement unique en envoyant une requête HTTP POST au /zerobus/v1/tables/<table-name>/insert point de terminaison. L’enregistrement lui-même est inclus dans le corps de la demande et doit être au format JSON.

Cet exemple vous guide tout au long de l’utilisation de CURL pour envoyer (push) des données vers Zerobus Ingest à l’aide de l’API REST.

En-têtes

La requête nécessite deux en-têtes HTTP spécifiques pour s’authentifier et mettre en forme la requête correctement.

  • Content-Type : application/json
    • Champ obligatoire pour spécifier le type de contenu. Actuellement, JSON est le seul format de message pris en charge.
  • Autorisation : Bearer <token>
    • Remplacez <le token> par le jeton OAuth que vous avez récupéré avec la commande curl fournie ultérieurement.

Récupérer le jeton OAuth : Ces jetons expirent toutes les heures et doivent être actualisés. Vous pouvez les actualiser en récupérant à nouveau le jeton OAuth.

Renseignez les paramètres suivants :

  • $CATALOG, $SCHEMA, , $TABLE, $WORKSPACE_ID, $WORKSPACE_URL
  • $DATABRICKS_CLIENT_ID et $DATABRICKS_CLIENT_SECRET
    • Ces deux paramètres correspondent au principe de service que vous avez créé.
authorization_details=$(cat <<EOF
[{
  "type": "unity_catalog_privileges",
  "privileges": ["USE CATALOG"],
  "object_type": "CATALOG",
  "object_full_path": "$CATALOG"
},
{
  "type": "unity_catalog_privileges",
  "privileges": ["USE SCHEMA"],
  "object_type": "SCHEMA",
  "object_full_path": "$CATALOG.$SCHEMA"
},
{
  "type": "unity_catalog_privileges",
  "privileges": ["SELECT", "MODIFY"],
  "object_type": "TABLE",
  "object_full_path": "$CATALOG.$SCHEMA.$TABLE"
}]
EOF
)

export OAUTH_TOKEN=$(curl -X POST \
  -u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
  -d "grant_type=client_credentials" \
  -d "scope=all-apis" \
  -d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
  --data-urlencode "authorization_details=$authorization_details" \
  "$WORKSPACE_URL/oidc/v1/token" | jq -r '.access_token')

Ingestion d’enregistrement unique :

Renseignez les paramètres suivants :

Le corps de la requête doit être une liste d’objets JSON.

curl -X POST \
  "$ZEROBUS_ENDPOINT/zerobus/v1/tables/$CATALOG.$SCHEMA.$TABLE/insert" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $OAUTH_TOKEN" \
  -d '[{ "device_name": "device_num_1", "temp": 28, "humidity": 60 },
       { "device_name": "device_num_1", "temp": 28, "humidity": 60 }]'

Si toutes les informations sont renseignées correctement, vous devez recevoir une réponse JSON vide avec un code d’état HTTP de 200.

Étapes suivantes