Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
As versões de ambiente para SDP estão em Beta.
Pipelines com uma versão environment definir Python código de execução por meio do Spark Connect. Esta página aborda o que é incompatível, o que se comporta de forma diferente, como verificar um pipeline em busca de padrões afetados e como migrar um pipeline existente.
Limitações
As versões de ambiente ainda não são compatíveis com todas as funcionalidades de pipeline. Uma execução de pipeline com um conjunto de versões de ambiente falhará se o código de Python do pipeline fizer o seguinte:
- Modifica o estado da sessão spark dentro de uma função decorada com um decorador de pipelines. Exemplos incluem
spark.conf.set(...),spark.sql("USE CATALOG ...")ecreateOrReplaceTempView. - Usa APIs do PySpark que não estão disponíveis no Spark Connect, incluindo
SparkContext,RDDeSQLContextqualquer APIs Py4J. Veja o que tem suporte no Spark Connect.
Se a habilitação de uma versão de ambiente em um pipeline causar falha, desabilitar a versão do ambiente retornará o pipeline ao estado anterior.
Alterações de comportamento
O Spark Connect tem um pequeno número de diferenças de comportamento em relação ao runtime clássico do PySpark. Consulte Spark Connect vs. Spark clássico para obter a referência completa. A verificação de compatibilidade detecta esses padrões com antecedência e bloqueia a habilitação até que eles sejam resolvidos, para que você possa encontrá-los e corrigi-los antes que eles afetem os dados de produção.
Em um pipeline, as situações mais comuns em que o comportamento pode ser diferente são:
- Construção de DataFrame intercalada e mutação de sessão
- UDFs que fazem referência ao estado Python mutável
Construção de DataFrame intercalada e mutação de sessão
Quando um pipeline constrói um DataFrame, depois altera o estado da sessão do Spark (por exemplo, altera o catálogo ou esquema padrão, define uma configuração, substitui uma exibição temporária ou registra novamente um UDF) e usa o DataFrame:
- Sem uma versão de ambiente, o DataFrame usa o estado da sessão de pré-mutação .
- Com uma versão de ambiente, o DataFrame usa o estado da sessão pós-mutação .
Por exemplo:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].
UDFs que fazem referência ao estado de Python mutável
Quando uma UDF faz referência a uma variável global Python cujo valor é alterado após a definição da UDF:
- Sem uma versão de ambiente, a UDF usa o valor mais recente da variável.
- Com uma versão de ambiente, a UDF usa o valor no momento em que a UDF foi definida.
Por exemplo:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].
Se um pipeline depender de qualquer padrão, audite-o antes de habilitar uma versão do ambiente.
Verificação de compatibilidade
A verificação de compatibilidade ajuda você a encontrar padrões de código em seu pipeline que produziriam resultados diferentes em uma versão de ambiente, antes de habilitar um. A verificação é aceita. Quando a verificação está habilitada em um pipeline:
- Cada execução de pipeline emite um
BehaviorChangeInSparkConnectWARNevento no log de eventos do pipeline por padrão detectado. - Você não pode habilitar uma versão de ambiente no pipeline até resolver todos os avisos de compatibilidade da atualização bem-sucedida anterior.
Se a verificação não estiver habilitada, nenhum evento será emitido e environment_version a habilitação não será bloqueada. O Databricks recomenda habilitar a verificação e resolver os padrões detectados antes de habilitar uma versão de ambiente no pipeline.
Habilitar a verificação em um pipeline
Você pode habilitar a verificação de compatibilidade adicionando a configuração de pipelines.environmentVersion.enableCompatibilityScan pipeline Você pode adicionar configuração por meio da interface do usuário do editor de pipeline ou adicionando uma entrada ao JSON de configuração de pipeline.
Por meio da interface do usuário:
- No editor de pipeline, clique em Configurações.
- Localize a seção Configuração nas configurações do pipeline.
- Clique
Adicionar configuração.
- Insira
pipelines.environmentVersion.enableCompatibilityScancomo a chave etruecomo o valor. - Salve as configurações de pipeline.
No JSON do pipeline:
Adicione a seguinte entrada ao configuration bloco:
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Fluxo de trabalho recomendado
- Habilite a verificação no pipeline.
- Execute uma execução de pipeline.
-
Consulte o log de eventos do pipeline para
BehaviorChangeInSparkConnectWARNeventos. Consulte a referência de eventos de compatibilidade para a lista completa de códigos de problema, padrões de exemplo e correções sugeridas. - Atualize o código do pipeline para remover os padrões detectados e execute o pipeline novamente até que nenhum outro evento seja emitido.
- Adicione
environment_versionao pipeline usando um dos métodos em Habilitar uma versão de ambiente em um pipeline.
Se você acredita que um aviso de compatibilidade é um falso positivo e deseja habilitar environment_version de qualquer maneira, remova a pipelines.environmentVersion.enableCompatibilityScan entrada da configuração do pipeline para ignorar a verificação. (Definir o valor como false não é permitido – você deve remover a entrada inteiramente.)
A verificação de pré-vôo não é executada em pipelines que não têm nenhuma atualização anterior ou em pipelines que já têm uma versão de ambiente definida.
Migrar um pipeline existente para versões de ambiente
Para migrar um pipeline existente que ainda não usa uma versão de ambiente, siga este fluxo de trabalho de ponta a ponta. Ele orienta você a encontrar padrões de código que podem se comportar de forma diferente no Spark Connect, corrigi-los e distribuir a versão do ambiente com segurança.
Habilite a verificação de compatibilidade no pipeline. Habilite a verificação no pipeline, conforme descrito na verificação de compatibilidade. Isso faz com que os padrões detectados sejam exibidos no log de eventos e o que permite a verificação de pré-vôo que protege sua tentativa de habilitação.
Dispare uma execução de pipeline e examine os eventos de compatibilidade. Dispare uma atualização normal do pipeline. Depois que for concluído com êxito, consulte o log de eventos do pipeline em busca de
BehaviorChangeInSparkConnectWARNeventos. Cada evento relata um padrão detectado. Consulte a referência de eventos de compatibilidade para a lista completa de códigos de problema, padrões de exemplo e correções sugeridas.Atualize o código do pipeline para atender aos padrões detectados. Para cada padrão detectado, atualize o código do pipeline após a correção sugerida. Após cada alteração, dispare outra atualização de pipeline e verifique se os eventos correspondentes não aparecem mais. Repita até que o log de eventos não exiba mais eventos de compatibilidade para uma atualização bem-sucedida.
Habilite a versão do ambiente no pipeline. Depois que a atualização bem-sucedida mais recente não tiver eventos de compatibilidade, adicione
environment_versionao pipeline usando a interface do usuário, a API ou o pacote, conforme descrito em Habilitar uma versão de ambiente em um pipeline. A próxima atualização é executada com o Spark Connect e a versão de linguagem Python fixada e bibliotecas pré-instaladas.Se a atualização falhar porque os avisos de compatibilidade ainda existem, solte a
environment_versionetapa 2 e resolva os avisos restantes antes de tentar novamente.Verifique a migração. Após a conclusão da primeira atualização com a versão do ambiente, verifique:
- O
create_updateevento no log de eventos mostraenvironment_versiondefinido como o valor esperado. - O pipeline produz os dados esperados e nenhum novo evento de erro é exibido.
- Verifique as tabelas downstream spot para quaisquer diferenças de comportamento sutis descritas nas alterações de comportamento.
- O
Reversão
Se o pipeline se comportar mal após a migração, remova as environment_version configurações do pipeline. A próxima atualização é executada com a configuração de runtime Python anterior. Use a execução revertida para depurar e repita a migração da etapa 2 depois de identificar e corrigir o problema.
Referência de eventos de compatibilidade
Quando a verificação de compatibilidade é habilitada em um pipeline, o SDP emite um BehaviorChangeInSparkConnectWARN evento no log de eventos do pipeline por padrão detectado. Quando a verificação está habilitada e a atualização anterior bem-sucedida detectou quaisquer padrões, o SDP também bloqueia environment_version a habilitação até que os padrões sejam resolvidos.
Cada evento relata um único código de problema que identifica o que foi detectado. Para pesquisar um código, localize-o na tabela Códigos de problema – cada linha vincula à seção de categoria que contém um padrão de exemplo e a correção sugerida.
Forma do evento
BehaviorChangeInSparkConnect os eventos seguem o esquema de log de eventos de pipeline padrão:
-
event_typeébehavior_change_in_spark_connect. -
leveléWARN. -
detailscontém obehavior_change_in_spark_connectobjeto, que tem um únicoissuecampo. O valor do problema é um dos códigos listados abaixo. -
messageé uma descrição legível pelo ser humano do padrão detectado.
Emitir códigos
| Categoria | Código de problema | Description |
|---|---|---|
| Mutações de banco de dados e catálogo | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
O catálogo padrão foi alterado depois que um DataFrame foi criado. O DataFrame existente pode resolver tabelas usando o novo catálogo padrão. |
| Mutações de banco de dados e catálogo | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG foi chamado fora de uma função decorada por um decorador de pipelines. O catálogo padrão pode ser alterado inesperadamente para operações subsequentes. |
| Mutações de banco de dados e catálogo | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
O banco de dados padrão foi alterado depois que um DataFrame foi criado. O DataFrame existente pode resolver tabelas usando o novo banco de dados padrão. |
| Mutações de banco de dados e catálogo | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE foi chamado fora de uma função decorada por um decorador de pipelines. O banco de dados padrão pode ser alterado inesperadamente para operações subsequentes. |
| Execução ansiosa dentro de funções de fluxo | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo chama um comando de ponto de verificação. |
| Execução ansiosa dentro de funções de fluxo | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo cria ansiosamente um modo de exibição DataFrame (createOrReplaceTempView ou semelhante). |
| Execução ansiosa dentro de funções de fluxo | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo cria um perfil de recurso. |
| Execução ansiosa dentro de funções de fluxo | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo chama spark.resources ou uma API de recurso relacionada. |
| Execução ansiosa dentro de funções de fluxo | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa uma ávida MERGE INTO em uma tabela de destino. |
| Execução ansiosa dentro de funções de fluxo | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa uma operação do Spark ML ansiosa. |
| Execução ansiosa dentro de funções de fluxo | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo registra uma fonte de dados Python. |
| Execução ansiosa dentro de funções de fluxo | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo opera em um identificador de consulta de streaming ativo. |
| Execução ansiosa dentro de funções de fluxo | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo registra ou remove um ouvinte de consulta de streaming. |
| Execução ansiosa dentro de funções de fluxo | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo chama spark.streams para gerenciar consultas de streaming. |
| Execução ansiosa dentro de funções de fluxo | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa uma operação ansiosa DataFrameWriterV2 . |
| Execução ansiosa dentro de funções de fluxo | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo executa uma operação ansiosa DataFrame.write . |
| Execução ansiosa dentro de funções de fluxo | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
A função de fluxo inicia uma consulta de streaming (writeStream.start()). |
| Mutações de configuração do Spark | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() ou spark.conf.unset() foi chamado dentro de uma função decorada por um decorador de pipelines. Não há suporte para isso com uma versão de ambiente. |
| Mutações de configuração do Spark | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() foi chamado fora de uma função decorada por um decorador de pipelines depois que um DataFrame foi criado. A alteração da configuração pode afetar o DataFrame existente no momento da execução. |
| Mutações de configuração do Spark | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() foi chamado fora de uma função decorada por um decorador de pipelines depois que um DataFrame foi criado. A alteração da configuração pode afetar o DataFrame existente no momento da execução. |
| Substituições de exibição temporária | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Uma exibição temporária global foi substituída depois que um DataFrame que fazia referência a ele foi criado. A substituição pode ser refletida no DataFrame existente. |
| Substituições de exibição temporária | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Uma exibição temporária foi substituída depois que um DataFrame que fazia referência a ele foi criado. A substituição pode ser refletida no DataFrame existente. |
| Mutações UDF e UDTF | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Uma UDF foi re-registrada com o mesmo nome depois que um DataFrame que fazia referência a ele foi criado. O DataFrame existente pode usar a nova definição de UDF. |
| Mutações UDF e UDTF | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Um UDTF foi registrado novamente com o mesmo nome depois que um DataFrame que fazia referência a ele foi criado. O DataFrame existente pode usar a nova definição UDTF. |
| Mutações UDF e UDTF | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Uma UDF faz referência a uma variável de Python mutável global. Com uma versão de ambiente, a UDF usa o valor da variável no momento em que a UDF foi definida, não no momento da invocação. |
| Mutações UDF e UDTF | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Um UDTF faz referência a uma variável de Python mutável global. Com uma versão de ambiente, o UDTF usa o valor da variável no momento em que o UDTF foi definido, não no momento da invocação. |
Mutações de banco de dados e catálogo
Esses problemas são emitidos quando o código do pipeline altera o banco de dados ou catálogo padrão. Com uma versão de ambiente, dataframes construídos antes da mutação podem resolver tabelas usando o novo banco de dados ou catálogo.
Padrão de exemplo que dispara um evento:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Sem uma versão de ambiente, df resolve events do marketing catálogo. Com uma versão de ambiente, df é resolvida events a partir do sales catálogo.
Correção sugerida: Qualifique totalmente os nomes de tabela para que a resolução não dependa do catálogo ou banco de dados padrão e evite alterar o catálogo ou o banco de dados padrão entre a criação e o uso do DataFrame.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Mutações de configuração do Spark
Esses problemas são emitidos quando o código de pipeline altera a configuração do Spark de maneiras que podem alterar o comportamento do DataFrame em uma versão de ambiente.
Padrão de exemplo que dispara um evento:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Sem uma versão de ambiente, a conversão usa o valor de configuração na hora de criação do DataFrame. Com uma versão de ambiente, a conversão usa spark.sql.ansi.enabled=true e pode falhar na entrada inválida.
Correção sugerida: Defina todas as configurações necessárias do Spark na parte superior do arquivo de pipeline, antes que qualquer DataFrame seja criado. Para a configuração por consulta, use a configuração do configuration pipeline na especificação do pipeline.
Substituições de exibição temporária
Esses problemas são emitidos quando o código de pipeline substitui uma exibição temporária depois que um DataFrame que faz referência a ele foi criado. Com uma versão de ambiente, o DataFrame existente pode refletir o novo conteúdo de exibição.
Padrão de exemplo que dispara um evento:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].
Correção sugerida: Crie cada exibição temporária uma única vez e não a substitua. Se você precisar de várias exibições com dados relacionados, dê a cada um um nome distinto.
Mutações UDF e UDTF
Esses problemas são emitidos quando o código de pipeline altera uma UDF ou UDTF de maneiras que alteram o comportamento em uma versão de ambiente.
Padrão de exemplo que dispara um evento:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].
Suggested fix: Pass values into the UDF as arguments instead of capture them from Python globals, or set the global before de define the UDF and do not mutate it afterward.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Execução ansiosa dentro de funções de fluxo
Esses problemas são emitidos quando o código de pipeline executa um comando Spark ansioso dentro de uma função decorada por um decorador de pipelines (@table, @materialized_viewetc.). Espera-se que as funções de fluxo definam e retornem um DataFrame; Comandos ansiosos que gravam dados, gerenciam consultas de streaming, registram recursos ou executam operações de ML não são permitidos dentro de uma função de fluxo com um conjunto de versões de ambiente.
Correção sugerida: Mova a operação ansiosa para fora da função de fluxo e retorne um DataFrame da função de fluxo. Efeitos colaterais, como gravar em uma tabela ou iniciar uma consulta de streaming, pertencem fora da definição do pipeline; o mecanismo de pipeline manipula a materialização do DataFrame retornado pela função de fluxo.
Localizar eventos de compatibilidade no log de eventos
A consulta a seguir retorna todos os eventos de compatibilidade de um pipeline, ordenados primeiro:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
Para contar eventos por código de emissão em atualizações recentes:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Para saber como consultar o log de eventos, consulte Consultar o log de eventos.
Consulte também
- Configurar versões de ambiente para pipelines — visão geral do recurso, como habilitar uma versão do ambiente.
- Esquema de log de eventos de pipeline – esquema de log de eventos de pipeline completo.
- Log de eventos do pipeline – como consultar o log de eventos do pipeline.