Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
In SDK v2, "Passaggio di esecuzione parallela" viene consolidato nel concetto di processo come parallel job. Il job parallelo mantiene lo stesso obiettivo per permettere agli utenti di accelerare l'esecuzione del loro lavoro distribuendo le attività ripetute su potenti cluster di calcolo multi-nodo. Oltre alla fase di esecuzione parallela, il processo parallelo v2 offre vantaggi aggiuntivi.
- Interfaccia flessibile, che consente all'utente di definire più input e output personalizzati per il processo parallelo. È possibile connetterli con altri passaggi per utilizzare o gestire il contenuto nello script di immissione
- Semplificare lo schema di input, che sostituisce
Datasetcome input usando il concetto v2data asset. È possibile usare facilmente i file locali o l'URI della directory BLOB come input per il processo parallelo. - Funzionalità più avanzate disponibili solo nel processo parallelo v2. Ad esempio, riprendere un processo parallelo non riuscito o annullato per continuare l'elaborazione di mini batch non elaborati o non elaborati riutilizzando risultati riusciti, risparmiando il lavoro duplicato.
Per aggiornare il passaggio di esecuzione parallela dell'SDK v1 corrente alla versione 2, è necessario
- Usare
parallel_run_functionper creare un processo parallelo sostituendoParallelRunConfigeParallelRunStepin v1. - Aggiornare la pipeline v1 alla versione 2. Richiamare quindi il processo parallelo v2 come passaggio nella pipeline v2. Vedere come aggiornare la pipeline dalla versione 1 alla versione 2 per ulteriori dettagli sull'aggiornamento della pipeline.
Nota
Lo script di immissione è compatibile tra il passaggio di esecuzione parallelo v1 e il processo parallelo v2. È possibile continuare a usare lo stesso entry_script.py quando si aggiorna il processo di esecuzione parallela.
Questo articolo offre un confronto tra scenari in SDK v1 e SDK v2. Negli esempi seguenti verrà creato un processo parallelo per stimare i dati di input in un processo di pipeline. Si vedrà come creare un processo parallelo e come usarlo in un processo della pipeline per SDK v1 e SDK v2.
Prerequisiti
- Preparare l'ambiente SDK v2: Installare Azure Machine Learning SDK v2 per Python
- Informazioni sulla base della pipeline SDK v2: Come creare Azure Machine Learning pipeline con Python SDK v2
Creare un passaggio parallelo
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )SDK v2
# import required libraries from azure.ai.ml import Input, Output from azure.ai.ml.constants import AssetTypes, InputOutputModes from azure.ai.ml.parallel import parallel_run_function, RunFunction # parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-1.5-ubuntu22.04-py310-cpu@latest", ), )
Utilizzare la fase parallela nella pipeline
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mapping delle funzionalità chiave in SDK v1 e SDK v2
| Funzionalità nell'SDK v1 | Mappatura approssimativa in SDK v2 |
|---|---|
|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
| OutputDatasetConfig | Output |
| dataset as_mount | Input |
Mappatura delle configurazioni e impostazioni dei processi paralleli
| SDK v1 | SDK v2 | Descrizione |
|---|---|---|
| Ambiente di ParallelRunConfig | parallel_run_function.task.environment | Ambiente in cui verrà eseguito il processo di training. |
| ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Script utente che verrà eseguito in parallelo su più nodi. |
| ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Numero di errori a livello di elemento (errori di record per i dati tabulari, errori di file per i dati di file) da ignorare nell'intero input del processo. Se questo conteggio viene superato, il processo viene contrassegnato come non riuscito.
-1 significa ignorare tutti gli errori degli elementi. Per la tolleranza di errore a livello di mini batch, usare mini_batch_error_threshold. |
| ParallelRunConfig.output_action | parallel_run_function.append_row_to | Aggregare tutti i valori restituiti da ogni esecuzione di mini batch e restituirlo in questo file. Può fare riferimento a uno degli output del processo parallelo usando l'espressione ${{outputs.<>output_name}} |
| ParallelRunConfig.node_count | parallel_run_function.instance_count | Numero facoltativo di istanze o nodi usati dalla destinazione di calcolo. Il valore predefinito è 1. |
| ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance (massima concorrenza per istanza) | Parallelismo massimo di ogni istanza di calcolo. |
| ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definire le dimensioni di ogni mini batch per suddividere l'input. Se il input_data è una cartella o un set di file, questo numero definisce il numero di file per ogni mini batch. Ad esempio, 10, 100. Se il input_data è dati tabulari da mltable, questo numero definisce le dimensioni fisiche prossime per ogni mini batch. L'unità predefinita è Byte e il valore può accettare una stringa come 100 kb, 100 mb. |
| ParallelRunConfig.source_directory | parallel_run_function.task.code | Percorso locale o remoto che punta al codice sorgente. |
| ParallelRunConfig.descrizione | descrizione_funzione_di_esecuzione_parallela | Descrizione amichevole del parallelo |
| ParallelRunConfig.logging_level | parallel_run_function.logging_level | Stringa del nome del livello di registrazione, come definito nel modulo 'logging'. I valori possibili sono 'WARNING', 'INFO' e 'DEBUG'. (facoltativo, il valore predefinito è 'INFO'). Questo valore può essere impostato tramite PipelineParameter. |
| ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Il timeout in secondi per l'esecuzione della funzione run() personalizzata. Se il tempo di esecuzione è superiore a questa soglia, il mini batch verrà interrotto e contrassegnato come mini-batch non riuscito per attivare nuovi tentativi. |
| ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.massimo_numero_riprovi | Numero di tentativi quando il mini-batch non riesce o va in timeout. Se tutti i tentativi non riescono, il mini-batch verrà contrassegnato come non riuscito e considerato nel calcolo di mini_batch_error_threshold. |
| ParallelRunConfig.append_row_file_name | funzione_di_esecuzione_parallela.aggiungi_riga_a | Combinato con l'impostazione append_row_to. |
| ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riuscito è superiore a questa soglia, il processo parallelo verrà contrassegnato come non riuscito. "-1" è il numero predefinito, ovvero ignorare tutti i mini batch non riusciti durante il processo parallelo. |
| ConfigurazioneEsecuzioneParallela.percentuale_perdita_consentita | parallel_run_function.task.program_arguments impostato --allowed_failed_percent |
Analogamente a "allowed_failed_count", ma questa impostazione usa la percentuale di mini batch non riusciti anziché il numero di errori del mini batch. L'intervallo di questa impostazione è [0, 100]. "100" è il numero predefinito, ovvero ignorare tutti i mini batch non riusciti durante il processo parallelo. |
| ParallelRunConfig.chiavi_di_partizione | parallel_run_function.partition_keys | Chiavi usate per partizionare il set di dati in mini batch. I dati con la stessa chiave vengono inseriti nello stesso mini batch. Se vengono specificati sia partition_keys che mini_batch_size, le chiavi di partizione diventano effettive. |
| ParallelRunConfig.environment_variables | parallel_run_function.variabili_di_ambiente | Dizionario di nomi e valori delle variabili di ambiente. Queste variabili di ambiente vengono impostate nel processo in cui viene eseguito lo script utente. |
| ParallelRunStep.name | parallel_run_function.name | Nome del processo o del componente parallelo creato. |
| ParallelRunStep.inputs | parallel_run_function.inputs | Un dizionario di input usato da questo processo parallelo. |
| -- | parallel_run_function.input_data | Dichiarare i dati da suddividere ed elaborare parallelamente |
| ParallelRunStep.output | parallel_run_function.outputs | Output di questo processo parallelo. |
| ParallelRunStep.side_inputs | parallel_run_function.inputs | Definito insieme a inputs. |
| ParallelRunStep.arguments | parallel_run_function.task.argomenti_programma | Argomenti dell'attività parallela. |
| ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Indicare se il parallelo restituirà l'output identico per lo stesso input. |
Passaggi successivi
Per altre informazioni, vedere la documentazione qui:
- Esempi di "Parallel run step SDK v1"
- Esempi Parallel job SDK v2