Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
- Esta documentación se ha retirado y es posible que no se actualice. Los productos, servicios o tecnologías mencionados en este contenido ya no se admiten.
- Databricks recomienda usar
ai_querypara la inferencia por lotes en su lugar. Consulte Enriquecimiento de datos mediante ai Functions.
En este artículo se muestra cómo usar Transformers de Hugging Face para la inferencia de modelos de procesamiento de lenguaje natural (NLP).
Transformers de Hugging Face proporciona la clase pipelines para usar el modelo preentrenado para realizar inferencias. 🤗 Las canalizaciones de Transformers admiten una amplia gama de tareas de NLP que puede usar fácilmente en Azure Databricks.
Requisitos
- MLflow 2.3
- Cualquier clúster con la biblioteca Hugging Face de
transformersinstalada se puede usar para la inferencia por lotes. La biblioteca detransformersviene preinstalada en Databricks Runtime 10.4 LTS ML y versiones posteriores. Muchos de los modelos de NLP populares funcionan mejor en el hardware de GPU. Por ello es posible que obtenga el mejor rendimiento mediante un hardware de GPU reciente, a menos que use un modelo optimizado específicamente para su uso en CPU.
Uso de UDF de Pandas para distribuir el cálculo del modelo en un clúster de Spark
Al experimentar con modelos entrenados previamente, puede usar Pandas UDFs para encapsular el modelo y realizar cálculos en CPUs o GPUs de los trabajadores. Las UDFs de Pandas distribuyen el modelo a cada trabajador.
También puede crear una canalización de Transformers de Hugging Face para la traducción automática y usar UDF de Pandas para ejecutar la canalización en los trabajadores de un clúster de Spark.
import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf
device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)
@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
return pd.Series(translations)
Establecer device de esta manera garantiza que las GPU se usen si están disponibles en el clúster.
Las canalizaciones de Hugging Face para la traducción devuelven una lista de objetos de Python dict, cada uno con una sola clave translation_text y un valor que contiene el texto traducido. Esta UDF extrae la traducción de los resultados para devolver una serie de Pandas con solo el texto traducido. Si la canalización se construyó para usar GPU al establecer device=0, Spark reasigna automáticamente las GPU en los nodos de trabajo si el clúster tiene instancias con varias GPU.
Para usar la UDF para traducir una columna de texto, puede llamar a la UDF en una instrucción select.
texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))
Devolver tipos de resultados complejos
Con las UDF de Pandas, también puede devolver una salida más estructurada. Por ejemplo, en el reconocimiento de entidades con nombre, las canalizaciones devuelven una lista de objetos dict que contienen la entidad, su intervalo, tipo y una puntuación asociada. Aunque es similar al ejemplo de traducción, el tipo de valor devuelto de la anotación @pandas_udf es más complejo en el caso del reconocimiento de entidades con nombre.
Puede obtener una idea de los tipos de retorno que se deben usar mediante la inspección de los resultados del pipeline, por ejemplo, al ejecutar el pipeline en el driver.
En este ejemplo, use el código siguiente:
from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)
Para producir las anotaciones:
[[{'entity_group': 'ORG',
'score': 0.99933606,
'word': 'Hugging Face',
'start': 0,
'end': 12},
{'entity_group': 'LOC',
'score': 0.99967843,
'word': 'New York City',
'start': 42,
'end': 55}],
[{'entity_group': 'ORG',
'score': 0.9996372,
'word': 'Databricks',
'start': 0,
'end': 10},
{'entity_group': 'LOC',
'score': 0.999588,
'word': 'San Francisco',
'start': 23,
'end': 36}]]
Para representar esto como un tipo de valor devuelto, puede usar un array con campos struct, enumerando las entradas dict como los campos del struct.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))
display(df.select(df.texts, ner_udf(df.texts).alias('entities')))
Ajustar rendimiento
A la hora de optimizar el rendimiento de la UDF se deben considerar varios aspectos clave. El primero consiste en usar cada GPU de forma eficaz, lo que se puede ajustar cambiando el tamaño de los lotes enviados a la GPU por la canalización de Transformers. El segundo es asegurarse de que el DataFrame está bien particionado para usar todo el clúster.
Por último, puede que desee almacenar en caché el modelo de Hugging Face para ahorrar tiempo de carga del modelo o costes de entrada.
Elegir un tamaño de lote
Aunque las UDF descritas anteriormente deben funcionar de forma predeterminada con un batch_size de 1, es posible que esto no use los recursos disponibles para los trabajadores de forma eficaz. Para mejorar el rendimiento, ajuste el tamaño del lote al modelo y al hardware del clúster. Databricks recomienda probar distintos tamaños de lote para la canalización en el clúster y encontrar el mejor rendimiento. Obtenga más información sobre el procesamiento en lotes de pipelines y otras opciones de rendimiento en la documentación de Hugging Face.
Intente encontrar un tamaño de lote lo suficientemente grande para que impulse el uso completo de la GPU, sin producir errores de CUDA out of memory. Cuando reciba CUDA out of memory errores durante el ajuste, debe iniciar una nueva sesión para liberar la memoria usada por el modelo y los datos en la GPU.
Supervise el rendimiento de la GPU mediante la visualización de las métricas del clúster activas de un clúster y la elección de una métrica, como gpu0-util para el uso del procesador de GPU o gpu0_mem_util para el uso de memoria de GPU.
Ajuste del paralelismo mediante la programación de nivel de etapa
De forma predeterminada, Spark programa una tarea por GPU en cada máquina. Para aumentar el paralelismo, puede indicar a Spark cuántas tareas se van a ejecutar por GPU mediante la programación de nivel de fase. Por ejemplo, si desea que Spark ejecute dos tareas por GPU, puede especificar esto de la siguiente manera:
from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder
task_requests = TaskResourceRequests().resource("gpu", 0.5)
builder = ResourceProfileBuilder()
resource_profile = builder.require(task_requests).build
rdd = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns)))).rdd.withResources(resource_profile)
Datos de repartición para usar todo el hardware disponible
La segunda consideración para el rendimiento es hacer un uso completo del hardware en el clúster. Por lo general, funciona bien usar un pequeño múltiplo del número de GPU en los trabajadores (para clústeres de GPU) o el número de núcleos en los trabajadores del clúster (para clústeres de CPU). Es posible que la trama de datos de entrada ya tenga suficientes particiones para aprovechar el paralelismo del clúster. Para ver cuántas particiones contiene el DataFrame, use df.rdd.getNumPartitions(). Puede volver a particionar un DataFrame mediante repartitioned_df = df.repartition(desired_partition_count).
Almacenar en caché el modelo en DBFS o en puntos de montaje
Si está cargando con frecuencia un modelo de clústeres diferentes o reiniciados, también puede almacenar en caché el modelo de Hugging Face en el volumen raíz de DBFS o en un punto de montaje. Esto puede reducir los costes de entrada y disminuir el tiempo de carga del modelo en un clúster nuevo o reiniciado. Para ello, establezca la variable de entorno en el código TRANSFORMERS_CACHE antes de cargar la canalización.
Por ejemplo:
import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'
También puede lograr resultados similares si registra el modelo en MLflowtransformers con el tipo de MLflow.
Cuaderno: Transformers de Hugging Face, inferencia y registro con MLflow
Para empezar rápidamente con código de ejemplo, este cuaderno es un ejemplo de extremo a extremo para la síntesis de texto mediante el uso de la inferencia en las canalizaciones de Transformers de Hugging Face y el registro con MLflow.
Cuaderno de inferencia de canalizaciones de Transformers de Hugging Face
Recursos adicionales
Puede ajustar el modelo de Hugging Face con las siguientes guías:
- Preparar los datos para ajustar los modelos de Hugging Face
- Ajuste preciso de los modelos de Hugging Face para una sola GPU
Más información sobre ¿Qué son los Transformers de Hugging Face?