Obtenir le contexte d’une tâche dans une FDU

Utilisez l’API TaskContext PySpark pour obtenir des informations contextuelles lors de l’exécution d’une fonction UDF Python du Batch Unity Catalog ou d’un UDF PySpark.

Par exemple, les informations contextuelles telles que l’identité de l’utilisateur et les balises de cluster peuvent vérifier l’identité d’un utilisateur pour accéder à des services externes.

Spécifications

Utiliser TaskContext pour obtenir des informations contextuelles

Sélectionnez un onglet pour afficher des exemples de TaskContext pour les fonctions UDF PySpark ou les fonctions UDF Python du Batch Unity Catalog.

PySpark UDF

L’exemple suivant de UDF PySpark affiche le contexte de l’utilisateur :

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Fonction UDF Python Unity Catalog par lot

L’exemple UDF Python Batch Unity Catalog suivant obtient l’identité de l’utilisateur pour appeler une fonction AWS Lambda à l’aide d’informations d’identification de service :

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Appelez l’UDF après son enregistrement :

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Propriétés de TaskContext

La TaskContext.getLocalProperty() méthode possède les clés de propriété suivantes :

Clé de propriété Description Exemple d’utilisation
user L'utilisateur qui exécute actuellement la UDF tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id ID de groupe de tâches Spark associé à la FDU actuelle tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Balises de métadonnées de cluster sous forme de paires clé-valeur formatées en tant que représentation de chaîne d’un dictionnaire JSON tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region Région où réside l’espace de travail tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId ID de compte Databricks pour le contexte d’exécution tc.getLocalProperty("accountId")
->"1234567890123456"
orgId Workspace ID (non disponible sur DBSQL) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Version de Databricks Runtime pour le cluster (sur les environnements non-DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion Version DBSQL (sur les environnements DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"