Använd en notebook för att komma åt en databasinstans

Viktigt!

Lakebase Provisioned är det ursprungliga Lakebase-erbjudandet som använder etablerad beräkning som du skalar manuellt. Information om regioner som stöds finns i Regiontillgänglighet. Den senaste versionen av Lakebase, med beräkning av automatisk skalning, skalning till noll, förgrening och omedelbar återställning, finns i Lakebase Autoscaling.

Nya Lakebase-exemplar skapas som automatisk skalningsprojekt. Distributionen startar 12 mars 2026. Mer information finns i Autoskalning som standard.

Den här sidan innehåller kodexempel som visar hur du kommer åt din Lakebase-databasinstans via Azure Databricks-notebook-filer och kör frågor med Python och Scala.

Exemplen omfattar olika anslutningsstrategier som passar olika användningsfall:

  • Enkel anslutning: Används för enkla skript där en enkel databasanslutning öppnas, används och stängs.
  • Anslutningspool: Används för arbetsbelastningar med hög samtidighet, där en pool med återanvändbara anslutningar underhålls.
  • Roterande M2M OAuth-token: Använder kortlivade, automatiskt uppdaterade OAuth-token för autentisering.

Följande exempel genererar programmatiskt säkra autentiseringsuppgifter. Undvik att ange autentiseringsuppgifter direkt i en notebook-fil. Databricks rekommenderar att du använder någon av följande säkra metoder:

  • Lagra Postgres-lösenord i Azure Databricks-hemligheter.
  • Generera OAuth-token med M2M OAuth.

Innan du börjar

Se till att du uppfyller följande krav innan du får åtkomst till din databasinstans:

  • Du har en motsvarande Postgres-roll för att logga in på databasinstansen. Se Hantera PostgreSQL-roller.
  • Din Postgres-roll beviljas de behörigheter som krävs för att få åtkomst till databasen, schemat eller tabellen.
  • Du kan autentisera till databasinstansen. Om du måste hämta en OAuth-token manuellt för din databasinstans läser du Autentisera till en databasinstans.

Varning

Om du använder en privat länk måste du använda ett kluster med en enda användare.

python

Azure Databricks Python SDK kan användas för att hämta en OAuth-token för en respektive databasinstans.

Anslut till din databasinstans från en Azure Databricks-notebook-fil med hjälp av följande Python-bibliotek:

  • psycopg (psycopg3)
  • psycopg med anslutningspoolning
  • SQLAlchemy

Förutsättningar

Innan du kör följande kodexempel uppgraderar du Databricks SDK för Python till version 0.61.0 eller senare och startar sedan om Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg

Kodexemplen visar en enda anslutning och användningen av en anslutningspool. Mer information om hur du hämtar databasinstansen och autentiseringsuppgifterna programmatiskt finns i hur du hämtar en OAuth-token med hjälp av Python SDK.

%pip install "psycopg[binary,pool]"

Enkel anslutning

import psycopg

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Anslutningspool

import psycopg
from psycopg_pool import ConnectionPool

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = ConnectionPool(
    conninfo=f"dbname=databricks_postgres user=<YOUR USER> host={instance.read_write_dns} port=5432 password={cred.token} sslmode=require",
    min_size=1,  # Minimum number of connections in the pool
    max_size=10,  # Maximum number of connections in the pool
    open=True
)
print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    with connection_pool.connection() as connection:
        print("Successfully received a connection from the pool")
        execFn(connection)
        print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

Kodexemplet visar användningen av en anslutningspool med en roterande M2M OAuth. Den använder generate_database_credential(). Mer information om hur du hämtar databasinstansen och autentiseringsuppgifterna programmatiskt finns i hur du hämtar en OAuth-token med hjälp av Python SDK.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Kodexemplen visar en enda anslutning och användningen av en anslutningspool med en roterande M2M OAuth-token. Mer information om hur du hämtar databasinstansen och autentiseringsuppgifterna programmatiskt finns i hur du hämtar en OAuth-token med hjälp av Python SDK.

Enkel anslutning

%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Anslutningspool och roterande M2M OAuth

%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Kodexemplen visar hur du programmatiskt hämtar databasinstansen och autentiseringsuppgifterna och hur du ansluter till en databasinstans med en enda anslutning eller en anslutningspool.

Steg 1: Använd Azure Databricks Java SDK för att hämta en OAuth-token

Mer information om hur du hämtar databasinstansen och autentiseringsuppgifterna programmatiskt finns i hur du hämtar en OAuth-token med hjälp av Java SDK.

Steg 2: Anslut till en databasinstans

Enkel anslutning

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Anslutningspool

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()