このページでは、Lakeflow Connect で Zerobus Ingest コネクタを使用してデータを取り込む方法について説明します。
インターフェイスを選択する
Zerobus Ingest では、gRPC、REST、OpenTelemetry (OTLP) インターフェイスがサポートされます。 SDK は、高スループットのアプリケーションを構築するための開発者向けのインターフェイスを備えたカスタム gRPC ベースのクライアントを提供します。 REST インターフェイスは、大量の "おしゃべり" デバイスを扱うときにアーキテクチャ上の制約を処理します。 OTLP インターフェイスは、カスタム ライブラリを必要とせずに標準の OpenTelemetry データを受け入れます。
- gRPC "接続税" を持つ SDK: gRPC は、永続的な接続による高スループットのパフォーマンスを専門としています。 ただし、開いているすべてのストリームは、同時実行制限に対してカウントされます。
- REST の「スループット負荷」: REST では、すべての更新に対して完全なハンドシェイクが必要であり、ステートレスです。 REST は、状態が報告頻度の低いエッジ デバイスのユース ケースに適しています。
- OpenTelemetry (OTLP): 既に OpenTelemetry SDK またはコレクターを使用している場合、OTLP エンドポイントは、カスタム統合を必要とせず、トレース、ログ、メトリックを Unity カタログ差分テーブルに取り込みます。 詳細については、「 Zerobus Ingest を使用した OpenTelemetry データの取り込み」を参照してください。
大規模なデバイスフリートで低頻度通信が必要な場合は REST を、大量のストリームを処理するには gRPC を、すでに OpenTelemetry で計装されている環境では OTLP プロトコルを活用してください。
ワークスペースの URL と Zerobus 取り込みエンドポイントを取得する
ログインすると、ワークスペースの URL がブラウザーに表示されます。 完全な URL は https://<databricks-instance>.net/o=XXXXXの形式に従います。ワークスペースの URL は、 /o=XXXXXの前のすべての URL で構成されます。 たとえば、次の完全な URL を指定すると、ワークスペースの URL とワークスペース ID を決定できます。
- 完全な URL:
https://abcd-teste2-test-spcse2.azuredatabricks.net/?o=2281745829657864# - ワークスペース URL:
https://abcd-teste2-test-spcse2.azuredatabricks.net - ワークスペース ID:
2281745829657864
サーバー エンドポイントは、ワークスペースとリージョンによって異なります。
- サーバー エンドポイント:
<workspace-id>.zerobus.<region>.azuredatabricks.net
ワークスペースのリージョンを見つけるには、Databricks UI の上部のナビゲーション バーでワークスペース スイッチャーを開きます。リージョンは各ワークスペース名の下に表示されます (たとえば、 eastus)。 アカウント コンソール の [ワークスペース] で見つけることもできます。
リージョンの可用性については、「 Zerobus Ingest Connector の制限事項」を参照してください。
ターゲット テーブルを作成または識別する
データを取り込むターゲット テーブルを特定します。 新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。 たとえば、 unity.default.air_qualityという名前の新しいテーブルを作成します。
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
注
OpenTelemetry インジェストの場合、テーブルでは、シグナルの種類 (トレース、ログ、メトリック) ごとに定義済みのスキーマを使用する必要があります。 Unity カタログでのターゲット テーブルの作成を参照してください。
サービス プリンシパルを作成し、アクセス許可を付与する
サービス プリンシパルは、パーソナライズされたアカウントよりもセキュリティを提供する特殊な ID です。 サービス プリンシパルと認証に使用する方法の詳細については、「OAuth を使用したAzure Databricksへのサービス プリンシパル アクセスの認証を参照してください。
サービス プリンシパルを作成するには、 設定>Identity と Access に移動します。
[ サービス プリンシパル] で 、[ 管理] を選択します。
サービスプリンシパルの追加をクリックします。
[ サービス プリンシパルの追加 ] ウィンドウで、[新規追加] をクリックして 新しいサービス プリンシパルを作成します。
サービス プリンシパルのクライアント ID とクライアント シークレットを生成して保存します。
カタログ、スキーマ、およびテーブルに必要なアクセス許可をサービス プリンシパルに付与します。
- [ サービス プリンシパル ] ページで、[ 構成] タブに移動します。
- アプリケーション ID (UUID) をコピーします。
- 次の SQL を使用してアクセス許可を付与します。必要に応じて、UUID とカタログ、スキーマ名、テーブル名の例を置き換えます。
GRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`; GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`; GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;Important
MODIFYが付与されているテーブルの場合でも、テーブルに対するSELECT権限とALL PRIVILEGES権限を付与する必要があります。
クライアントを書き込む
任意のプログラミング言語で Zerobus SDK を使用するか、REST API を使用してターゲット テーブルにデータを取り込みます。
Python SDK
Python 3.9 以上が必要です。 SDK では、高パフォーマンスの Rust SDK への PyO3 バインドが使用され、Rust 非同期ランタイムを介して純粋なPythonと効率的なネットワーク I/O よりも最大 40 倍高いスループットが提供されます。 JSON (最も単純) とプロトコル バッファー (運用環境に推奨) がサポートされています。 SDK では、同期と非同期の両方の実装と、3 つの異なるインジェスト方法 (将来ベース、オフセットベース、ファイア アンド フォーゲット) もサポートされています。
pip install databricks-zerobus-ingest-sdk
JSON の例:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.eastus.azuredatabricks.net"
DATABRICKS_WORKSPACE_URL="https://adb-1234567890123456.12.azuredatabricks.net"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"
sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)
# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()
受信確認コールバック: インジェストの進行状況を非同期的に追跡するには、 ack_callback オプションを使用します。
AckCallbackメソッドとon_ack(offset: int)メソッドを使用してon_error(offset: int, error_message: str)のサブクラスを渡します。これは、レコードが確認または失敗したときに呼び出されます。
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType
class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")
def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")
options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)
プロトコル バッファー: タイプ セーフなインジェストの場合は、 RecordType.PROTO (既定) でプロトコル バッファーを使用し、テーブルのプロパティに descriptorProto を指定します。
完全なドキュメント、構成オプション、バッチ インジェスト、プロトコル バッファーの例については、Python SDK リポジトリを参照してください。
Rust SDK
Rust 1.70 以上が必要です。 SDK は、高スループットのインジェストのために非同期 I/O と gRPC を活用し、他のすべての SDK のコアとして機能します。 JSON (最も単純) とプロトコル バッファー (運用環境に推奨) がサポートされています。
まず、パッケージをインポートします。
cargo add databricks-zerobus-ingest-sdk
または、 Cargo.tomlに追加します。
[dependencies]
databricks-zerobus-ingest-sdk = "0.5.0" # Latest version at time of publication
JSON の例:
use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, StreamConfigurationOptions,
TableProperties, ZerobusSdk,
};
use std::error::Error;
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const DATABRICKS_WORKSPACE_URL: &str = "https://adb-1234567890123456.12.azuredatabricks.net";
const SERVER_ENDPOINT: &str = "1234567890123456.zerobus.eastus.azuredatabricks.net";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
CLIENT_ID.to_string(),
CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
let offset = stream.ingest_record_offset(
JsonString("{
\"device_name\": \"sensor\",
\"temp\": 22,
\"humidity\": 55}".to_string())).await?;
stream.wait_for_offset(offset).await?;
println!("Record ingested successfully");
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
プロトコル バッファー: タイプ セーフなインジェストの場合は、 RecordType::Proto (既定) でプロトコル バッファーを使用し、テーブルのプロパティに descriptor_proto を指定します。
generate_proto ツールを使用して必要なファイルを生成し、プロジェクトにインポートします。
完全なドキュメント、構成オプション、バッチ インジェスト、 generate_proto ツール、プロトコル バッファーの例については、 Rust SDK リポジトリを参照してください。
Java SDK
Java 8 以上が必要です。 SDK では、高パフォーマンスの Rust SDK に対する JNI (Java ネイティブ インターフェイス) バインドが使用され、純粋なJava gRPC よりも待機時間が短く、Rust 非同期ランタイムを介した効率的なネットワーク I/O が提供されます。 JSON (最も単純) とプロトコル バッファー (運用環境に推奨) がサポートされています。
Maven:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.2.0</version>
</dependency>
JSON の例:
import com.databricks.zerobus.*;
public class ZerobusClient {
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
private static final String SERVER_ENDPOINT =
"https://1234567890123456.zerobus.eastus.azuredatabricks.net";
private static final String DATABRICKS_WORKSPACE_URL =
"https://adb-1234567890123456.12.azuredatabricks.net";
private static final String TABLE_NAME = "main.default.air_quality";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
public static void main(String[] args) throws Exception {
ZerobusSdk sdk = new ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
);
ZerobusJsonStream stream = sdk.createJsonStream(
TABLE_NAME, CLIENT_ID, CLIENT_SECRET
).join();
try {
long lastOffset = 0;
for (int i = 0; i < 100; i++) {
String record = String.format(
"{\"device_name\": \"sensor-%d\", \"temp\": 22, \"humidity\": 55}", i
);
lastOffset = stream.ingestRecordOffset(record);
}
stream.waitForOffset(lastOffset);
} finally {
stream.close();
}
}
}
プロトコルバッファ:型安全なインジェストには、ZerobusProtoStreamをcreateProtoStream()と共に使用します。 バンドルされた JAR ツールを使用してテーブルからスキーマを生成し、 protocでコンパイルします。
完全なドキュメント、構成オプション、バッチ インジェスト、プロトコル バッファーの例については、Java SDK リポジトリを参照してください。
Go SDK (ソフトウェア開発キット)
Go 1.21 以上が必要です。 この SDK は、CGO と FFI を使用して高パフォーマンスの Rust SDK をラップし、同じスループットとパフォーマンスを提供します。 JSON (最も単純) とプロトコル バッファー (運用環境に推奨) がサポートされています。
go get github.com/databricks/zerobus-sdk-go@latest
JSON の例:
わかりやすくするために、ここではエラーは無視されます。 運用コードでは、常にエラーを確認します。
package main
import (
"fmt"
zerobus "github.com/databricks/zerobus-sdk-go"
)
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const (
ServerEndpoint = "https://1234567890123456.zerobus.eastus.azuredatabricks.net"
DatabricksWorkspaceURL = "https://adb-1234567890123456.12.azuredatabricks.net"
TableName = "main.default.air_quality"
ClientID = "your-client-id"
ClientSecret = "your-client-secret"
)
func main() {
sdk, _ := zerobus.NewZerobusSdk(
ServerEndpoint,
DatabricksWorkspaceURL,
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{
TableName: TableName,
},
ClientID,
ClientSecret,
options,
)
defer stream.Close()
offset, _ := stream.IngestRecordOffset(`{
"device_name": "sensor-001",
"temp": 20,
"humidity": 60
}`)
_ = stream.WaitForOffset(offset)
fmt.Println("Record ingested successfully")
_ = stream.Close()
fmt.Println("Stream closed successfully")
}
プロトコル バッファー: タイプ セーフなインジェストの場合は、 RecordTypeProto (既定) でプロトコル バッファーを使用し、テーブルのプロパティに descriptorProto を指定します。 テーブル スキーマに一致する .proto ファイルを作成し、 generate_proto スクリプトを実行して、ファイルをプロジェクトにインポートできるようにします。
完全なドキュメント、構成オプション、バッチ インジェスト、generate_proto ツール、プロトコル バッファーの例については、 Go SDK リポジトリを参照してください。
TypeScript SDK
Node.js 16 以上が必要です。 SDK は、NAPI-RS ネイティブ バインディングを使用して高パフォーマンスの Rust SDK をラップし、JavaScript Promise にマップされた Rust futures でネイティブ パフォーマンスを提供します。 JSON (最も単純) とプロトコル バッファー (運用環境に推奨) がサポートされています。
npm install @databricks/zerobus-ingest-sdk
JSON の例:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const SERVER_ENDPOINT = 'https://1234567890123456.zerobus.eastus.azuredatabricks.net';
const DATABRICKS_WORKSPACE_URL = 'https://adb-1234567890123456.12.azuredatabricks.net';
const TABLE_NAME = 'main.default.air_quality';
const CLIENT_ID = 'your-client-id';
const CLIENT_SECRET = 'your-client-secret';
const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);
const stream = await sdk.createStream({ tableName: TABLE_NAME }, CLIENT_ID, CLIENT_SECRET, {
recordType: RecordType.Json,
});
try {
let lastOffset = BigInt(0);
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
lastOffset = await stream.ingestRecordOffset(record);
}
await stream.waitForOffset(lastOffset);
} finally {
await stream.close();
}
プロトコル バッファー: タイプ セーフなインジェストの場合は、 RecordType.Proto (既定) でプロトコル バッファーを使用し、テーブルのプロパティに descriptorProto を指定します。
完全なドキュメント、構成オプション、バッチ インジェスト、プロトコル バッファーの例については、 TypeScript SDK リポジトリを参照してください。
REST API
REST API を使用すると、 エンドポイントに HTTP POST 要求を送信することで、1 つのレコードを取り込めます。 レコード自体は要求本文に含まれており、JSON 形式である必要があります。
この例では、CURL を使用して REST API を使用して Zerobus Ingest にデータをプッシュする方法について説明します。
ヘッダー
要求を正しく認証して書式設定するには、2 つの特定の HTTP ヘッダーが必要です。
-
Content-Type: application/json
- コンテンツ タイプを指定するための必須フィールド。 現在、サポートされている唯一のメッセージ形式は JSON です。
-
認証: ベアラー <トークン>
- <token>を、後で提供される curl コマンドを使用してフェッチした OAuth トークンに置き換えます。
OAuth トークンのフェッチ: これらのトークンは 1 時間ごとに期限切れになり、更新する必要があります。 それらを更新するには、OAuth トークンを再取得します。
次のパラメーターを入力します。
-
$CATALOG、$SCHEMA、$TABLE、$WORKSPACE_ID、$WORKSPACE_URL -
$DATABRICKS_CLIENT_IDおよび$DATABRICKS_CLIENT_SECRET- これら 2 つのパラメーターは、作成したサービス プリンシパルに対応します。
authorization_details=$(cat <<EOF
[{
"type": "unity_catalog_privileges",
"privileges": ["USE CATALOG"],
"object_type": "CATALOG",
"object_full_path": "$CATALOG"
},
{
"type": "unity_catalog_privileges",
"privileges": ["USE SCHEMA"],
"object_type": "SCHEMA",
"object_full_path": "$CATALOG.$SCHEMA"
},
{
"type": "unity_catalog_privileges",
"privileges": ["SELECT", "MODIFY"],
"object_type": "TABLE",
"object_full_path": "$CATALOG.$SCHEMA.$TABLE"
}]
EOF
)
export OAUTH_TOKEN=$(curl -X POST \
-u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
-d "grant_type=client_credentials" \
-d "scope=all-apis" \
-d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
--data-urlencode "authorization_details=$authorization_details" \
"$WORKSPACE_URL/oidc/v1/token" | jq -r '.access_token')
単一レコードのインジェスト:
次のパラメーターを入力します。
$ZEROBUS_ENDPOINT- 「 ワークスペース URL の取得」と「Zerobus Ingest Endpoint」 セクションで定義されているとおり。
-
$CATALOG、$SCHEMA、$TABLE、$WORKSPACE_ID、$WORKSPACE_URL $OAUTH_TOKEN- これは前の手順で作成されました。
要求本文は、JSON オブジェクトの一覧である必要があります。
curl -X POST \
"$ZEROBUS_ENDPOINT/zerobus/v1/tables/$CATALOG.$SCHEMA.$TABLE/insert" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OAUTH_TOKEN" \
-d '[{ "device_name": "device_num_1", "temp": 28, "humidity": 60 },
{ "device_name": "device_num_1", "temp": 28, "humidity": 60 }]'
すべての情報が正しく入力されている場合は、HTTP 状態コードが 200 の空の JSON 応答を受け取る必要があります。