Aggiornare il passaggio di esecuzione parallela a SDK v2

In SDK v2, "Passaggio di esecuzione parallela" viene consolidato nel concetto di processo come parallel job. Il job parallelo mantiene lo stesso obiettivo per permettere agli utenti di accelerare l'esecuzione del loro lavoro distribuendo le attività ripetute su potenti cluster di calcolo multi-nodo. Oltre alla fase di esecuzione parallela, il processo parallelo v2 offre vantaggi aggiuntivi.

  • Interfaccia flessibile, che consente all'utente di definire più input e output personalizzati per il processo parallelo. È possibile connetterli con altri passaggi per utilizzare o gestire il contenuto nello script di immissione
  • Semplificare lo schema di input, che sostituisce Dataset come input usando il concetto v2 data asset . È possibile usare facilmente i file locali o l'URI della directory BLOB come input per il processo parallelo.
  • Funzionalità più avanzate disponibili solo nel processo parallelo v2. Ad esempio, riprendere un processo parallelo non riuscito o annullato per continuare l'elaborazione di mini batch non elaborati o non elaborati riutilizzando risultati riusciti, risparmiando il lavoro duplicato.

Per aggiornare il passaggio di esecuzione parallela dell'SDK v1 corrente alla versione 2, è necessario

  • Usare parallel_run_function per creare un processo parallelo sostituendo ParallelRunConfig e ParallelRunStep in v1.
  • Aggiornare la pipeline v1 alla versione 2. Richiamare quindi il processo parallelo v2 come passaggio nella pipeline v2. Vedere come aggiornare la pipeline dalla versione 1 alla versione 2 per ulteriori dettagli sull'aggiornamento della pipeline.

Nota

Lo script di immissione è compatibile tra il passaggio di esecuzione parallelo v1 e il processo parallelo v2. È possibile continuare a usare lo stesso entry_script.py quando si aggiorna il processo di esecuzione parallela.

Questo articolo offre un confronto tra scenari in SDK v1 e SDK v2. Negli esempi seguenti verrà creato un processo parallelo per stimare i dati di input in un processo di pipeline. Si vedrà come creare un processo parallelo e come usarlo in un processo della pipeline per SDK v1 e SDK v2.

Prerequisiti

Creare un passaggio parallelo

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # import required libraries
    from azure.ai.ml import Input, Output
    from azure.ai.ml.constants import AssetTypes, InputOutputModes
    from azure.ai.ml.parallel import parallel_run_function, RunFunction
    
    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-1.5-ubuntu22.04-py310-cpu@latest",
        ),
    )
    

Utilizzare la fase parallela nella pipeline

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Mapping delle funzionalità chiave in SDK v1 e SDK v2

Funzionalità nell'SDK v1 Mappatura approssimativa in SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Output
dataset as_mount Input

Mappatura delle configurazioni e impostazioni dei processi paralleli

SDK v1 SDK v2 Descrizione
Ambiente di ParallelRunConfig parallel_run_function.task.environment Ambiente in cui verrà eseguito il processo di training.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Script utente che verrà eseguito in parallelo su più nodi.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Numero di errori a livello di elemento (errori di record per i dati tabulari, errori di file per i dati di file) da ignorare nell'intero input del processo. Se questo conteggio viene superato, il processo viene contrassegnato come non riuscito. -1 significa ignorare tutti gli errori degli elementi. Per la tolleranza di errore a livello di mini batch, usare mini_batch_error_threshold.
ParallelRunConfig.output_action parallel_run_function.append_row_to Aggregare tutti i valori restituiti da ogni esecuzione di mini batch e restituirlo in questo file. Può fare riferimento a uno degli output del processo parallelo usando l'espressione ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Numero facoltativo di istanze o nodi usati dalla destinazione di calcolo. Il valore predefinito è 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance (massima concorrenza per istanza) Parallelismo massimo di ogni istanza di calcolo.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Definire le dimensioni di ogni mini batch per suddividere l'input.

Se il input_data è una cartella o un set di file, questo numero definisce il numero di file per ogni mini batch. Ad esempio, 10, 100.

Se il input_data è dati tabulari da mltable, questo numero definisce le dimensioni fisiche prossime per ogni mini batch. L'unità predefinita è Byte e il valore può accettare una stringa come 100 kb, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Percorso locale o remoto che punta al codice sorgente.
ParallelRunConfig.descrizione descrizione_funzione_di_esecuzione_parallela Descrizione amichevole del parallelo
ParallelRunConfig.logging_level parallel_run_function.logging_level Stringa del nome del livello di registrazione, come definito nel modulo 'logging'. I valori possibili sono 'WARNING', 'INFO' e 'DEBUG'. (facoltativo, il valore predefinito è 'INFO'). Questo valore può essere impostato tramite PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Il timeout in secondi per l'esecuzione della funzione run() personalizzata. Se il tempo di esecuzione è superiore a questa soglia, il mini batch verrà interrotto e contrassegnato come mini-batch non riuscito per attivare nuovi tentativi.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.massimo_numero_riprovi Numero di tentativi quando il mini-batch non riesce o va in timeout. Se tutti i tentativi non riescono, il mini-batch verrà contrassegnato come non riuscito e considerato nel calcolo di mini_batch_error_threshold.
ParallelRunConfig.append_row_file_name funzione_di_esecuzione_parallela.aggiungi_riga_a Combinato con l'impostazione append_row_to.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riuscito è superiore a questa soglia, il processo parallelo verrà contrassegnato come non riuscito.

"-1" è il numero predefinito, ovvero ignorare tutti i mini batch non riusciti durante il processo parallelo.
ConfigurazioneEsecuzioneParallela.percentuale_perdita_consentita parallel_run_function.task.program_arguments impostato
--allowed_failed_percent
Analogamente a "allowed_failed_count", ma questa impostazione usa la percentuale di mini batch non riusciti anziché il numero di errori del mini batch.

L'intervallo di questa impostazione è [0, 100]. "100" è il numero predefinito, ovvero ignorare tutti i mini batch non riusciti durante il processo parallelo.
ParallelRunConfig.chiavi_di_partizione parallel_run_function.partition_keys Chiavi usate per partizionare il set di dati in mini batch. I dati con la stessa chiave vengono inseriti nello stesso mini batch. Se vengono specificati sia partition_keys che mini_batch_size, le chiavi di partizione diventano effettive.
ParallelRunConfig.environment_variables parallel_run_function.variabili_di_ambiente Dizionario di nomi e valori delle variabili di ambiente. Queste variabili di ambiente vengono impostate nel processo in cui viene eseguito lo script utente.
ParallelRunStep.name parallel_run_function.name Nome del processo o del componente parallelo creato.
ParallelRunStep.inputs parallel_run_function.inputs Un dizionario di input usato da questo processo parallelo.
-- parallel_run_function.input_data Dichiarare i dati da suddividere ed elaborare parallelamente
ParallelRunStep.output parallel_run_function.outputs Output di questo processo parallelo.
ParallelRunStep.side_inputs parallel_run_function.inputs Definito insieme a inputs.
ParallelRunStep.arguments parallel_run_function.task.argomenti_programma Argomenti dell'attività parallela.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Indicare se il parallelo restituirà l'output identico per lo stesso input.

Passaggi successivi

Per altre informazioni, vedere la documentazione qui: