Patrón de monitoreo

El patrón de supervisión es un proceso periódico en un flujo de trabajo que sondea un sistema externo hasta que se cumple una condición, por ejemplo, comprobando el estado del trabajo hasta que se complete o viendo los datos meteorológicos hasta que los cielos estén claros. A diferencia de un disparador de temporizador de programación fija, un monitor espera entre iteraciones (evitando la superposición), soporta intervalos dinámicos y puede finalizar por sí mismo una vez que se cumpla la condición o expire un tiempo de espera.

En este artículo se explica cómo implementar el patrón de supervisión mediante orquestaciones duraderas.

Tip

En este artículo se muestra la implementación completa. Para obtener información general conceptual sobre los casos de uso de orquestación duradera, consulte ¿Qué es Durable Task?.

Los ejemplos de Durable Functions incluyen un escenario de supervisión meteorológica (C#/JavaScript) y un escenario de supervisión de problemas de GitHub (Python).

Nota:

La versión 4 del modelo de programación de Node.js para Azure Functions está disponible con carácter general. El modelo v4 está diseñado para proporcionar una experiencia más flexible e intuitiva para los desarrolladores de JavaScript y TypeScript. Para obtener más información sobre las diferencias entre v3 y v4, consulte la guía de migración.

En los siguientes fragmentos de código, JavaScript (PM4) denota el modelo de programación v4, la nueva experiencia.

En el ejemplo de SDK de Durable Task se muestra la supervisión del estado del trabajo con intervalos de sondeo configurables mediante .NET, JavaScript, Python y Java.

Prerrequisitos

  • SDK de .NET 8.0 o posterior
  • Acceso al Planificador de Tareas Durables de Azure o al emulador local

Información general sobre el escenario de supervisión

En este ejemplo se supervisan las condiciones meteorológicas actuales de una ubicación y alerta al usuario por SMS cuando el cielo está despejado. Puede utilizar una función regular desencadenada por temporizador para comprobar el tiempo y enviar alertas. Sin embargo, un problema con este enfoque es la gestión del ciclo de vida. Si solo se debe enviar una sola alerta, el monitor necesita desactivarse después de que se detecte un clima despejado.

El patrón de supervisión puede finalizar su propia ejecución, entre otras ventajas:

  • Las supervisiones se ejecutan en intervalos, no en programaciones: un desencadenador en temporizador se ejecuta cada hora; una supervisión espera una hora entre acciones. Las acciones de un monitor no se superponen a menos que se especifique, lo que puede ser importante para las tareas de larga duración.
  • Las supervisiones pueden tener intervalos dinámicos: el tiempo de espera puede cambiar en función de alguna condición.
  • Las supervisiones pueden finalizar cuando se cumple alguna condición o cuando otro proceso las finaliza.
  • Los monitores pueden aceptar parámetros. En el ejemplo se muestra cómo se puede aplicar el mismo proceso de supervisión a cualquier ubicación solicitada, número de teléfono o repositorio.
  • Los monitores son escalables. Debido a que cada supervisión es una instancia de orquestación, se pueden crear múltiples supervisiones sin tener que crear nuevas funciones ni definir más código.
  • Los monitores se integran fácilmente en flujos de trabajo mayores. Una supervisión puede ser una sección de una función de orquestación más compleja, o una suborquestación.

Este ejemplo supervisa el estado de un trabajo de ejecución prolongada y devuelve el resultado final cuando el trabajo finaliza o se agota el tiempo de espera. Puede usar un bucle de sondeo normal para comprobar el estado del trabajo, pero este método tiene limitaciones en cuanto a la gestión de la vida útil y la fiabilidad.

El patrón de supervisión proporciona estas ventajas:

  • Sondeo duradero: la orquestación sobrevive a los reinicios del proceso y puede seguir supervisando aunque haya errores.
  • Intervalos configurables: el tiempo de espera entre las comprobaciones de estado se puede ajustar dinámicamente.
  • Compatibilidad con tiempo de espera: la supervisión puede finalizar cuando se cumple una condición o termina un tiempo de espera.
  • Visibilidad del estado: los clientes pueden consultar el estado personalizado de la orquestación para ver el progreso de la supervisión actual.
  • Escalabilidad: varios monitores pueden ejecutarse simultáneamente, cada uno de ellos realiza un seguimiento de trabajos diferentes.

Configuración

Configuración de la integración de Twilio

En este ejemplo se usa el servicio Twilio para enviar mensajes SMS a un teléfono móvil. Azure Functions ya tiene compatibilidad con Twilio a través del enlace Twilio y el ejemplo usa esa característica.

Lo primero que necesita es una cuenta de Twilio. Puede crear una gratis en https://www.twilio.com/try-twilio. Cuando tenga una cuenta, agregue las tres configuraciones de app a su aplicación de funciones.

Nombre de la configuración de la aplicación Descripción del valor
TwilioAccountSid Identificador de seguridad de la cuenta de Twilio
TwilioAuthToken Token de autenticación de la cuenta de Twilio
TwilioPhoneNumber Número de teléfono asociado a la cuenta de Twilio, que se utiliza para enviar mensajes SMS.

Configuración de una API meteorológica

Los ejemplos de C#/JavaScript llaman a una API meteorológica para comprobar las condiciones actuales. Debe proporcionar su propia clave de API meteorológica y actualizar el código de ejemplo en consecuencia. El código de ejemplo hace referencia a una WeatherUndergroundApiKey configuración de aplicación: reemplace esto por la clave del proveedor meteorológico elegido.

Nombre de la configuración de la aplicación Descripción del valor
WeatherUndergroundApiKey Tu clave API de clima (reemplaza con el nombre de clave de tu proveedor según sea necesario).

Orquestador

[FunctionName("E3_Monitor")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext monitorContext, ILogger log)
{
    MonitorRequest input = monitorContext.GetInput<MonitorRequest>();
    if (!monitorContext.IsReplaying) { log.LogInformation($"Received monitor request. Location: {input?.Location}. Phone: {input?.Phone}."); }

    VerifyRequest(input);

    DateTime endTime = monitorContext.CurrentUtcDateTime.AddHours(6);
    if (!monitorContext.IsReplaying) { log.LogInformation($"Instantiating monitor for {input.Location}. Expires: {endTime}."); }

    while (monitorContext.CurrentUtcDateTime < endTime)
    {
        // Check the weather
        if (!monitorContext.IsReplaying) { log.LogInformation($"Checking current weather conditions for {input.Location} at {monitorContext.CurrentUtcDateTime}."); }

        bool isClear = await monitorContext.CallActivityAsync<bool>("E3_GetIsClear", input.Location);

        if (isClear)
        {
            // It's not raining! Or snowing. Or misting. Tell our user to take advantage of it.
            if (!monitorContext.IsReplaying) { log.LogInformation($"Detected clear weather for {input.Location}. Notifying {input.Phone}."); }

            await monitorContext.CallActivityAsync("E3_SendGoodWeatherAlert", input.Phone);
            break;
        }
        else
        {
            // Wait for the next checkpoint
            var nextCheckpoint = monitorContext.CurrentUtcDateTime.AddMinutes(30);
            if (!monitorContext.IsReplaying) { log.LogInformation($"Next check for {input.Location} at {nextCheckpoint}."); }

            await monitorContext.CreateTimer(nextCheckpoint, CancellationToken.None);
        }
    }

    log.LogInformation($"Monitor expiring.");
}

[Deterministic]
private static void VerifyRequest(MonitorRequest request)
{
    if (request == null)
    {
        throw new ArgumentNullException(nameof(request), "An input object is required.");
    }

    if (request.Location == null)
    {
        throw new ArgumentNullException(nameof(request.Location), "A location input is required.");
    }

    if (string.IsNullOrEmpty(request.Phone))
    {
        throw new ArgumentNullException(nameof(request.Phone), "A phone number input is required.");
    }
}

La función del orquestador requiere una ubicación para supervisar y un número de teléfono al que enviar un mensaje cuando el tiempo aparece despejado en la ubicación. Estos datos se pasan a la función del orquestador como un objeto MonitorRequest fuertemente tipado.

Esta función de orquestador realiza las acciones siguientes:

  1. Obtiene el MonitorRequest que consta de la ubicación a supervisar y el número de teléfono al que envía una notificación SMS (o repo para el ejemplo de Python).
  2. Determina el tiempo de expiración del monitor. El ejemplo utiliza un valor codificado de forma fija para brevedad.
  3. Llama a la actividad de comprobación de estado para determinar si se cumple la condición.
  4. Si se cumple la condición, llama a la actividad de alerta para enviar una notificación.
  5. Crea un temporizador duradero para reanudar la orquestación en el siguiente intervalo de sondeo. El ejemplo utiliza un valor codificado de forma fija para brevedad.
  6. Continúa ejecutándose hasta que la hora universal coordinada actual pasa la hora de expiración de la supervisión o se envía una alerta.

Varias instancias de función de orquestador se pueden ejecutar simultáneamente mediante una llamada a la función de orquestador varias veces. Se puede especificar la ubicación que se va a supervisar y el número de teléfono para enviar una alerta. La función del orquestador no se está ejecutando mientras espera el temporizador, por lo que no se le cobrará.

El orquestador comprueba periódicamente el estado de un trabajo y devuelve cuando el trabajo se completa o agota el tiempo de espera.

using Microsoft.DurableTask;
using System;
using System.Threading.Tasks;

[DurableTask(nameof(MonitoringJobOrchestration))]
public class MonitoringJobOrchestration : TaskOrchestrator<JobMonitorInput, JobMonitorResult>
{
    public override async Task<JobMonitorResult> RunAsync(
        TaskOrchestrationContext context, JobMonitorInput input)
    {
        var jobId = input.JobId;
        var pollingInterval = TimeSpan.FromSeconds(input.PollingIntervalSeconds);
        var expirationTime = context.CurrentUtcDateTime.AddSeconds(input.TimeoutSeconds);

        // Initialize monitoring state
        int checkCount = 0;

        while (context.CurrentUtcDateTime < expirationTime)
        {
            // Check current job status
            var jobStatus = await context.CallActivityAsync<JobStatus>(
                nameof(CheckJobStatusActivity),
                new CheckJobInput { JobId = jobId, CheckCount = checkCount });

            checkCount = jobStatus.CheckCount;

            // Make job status available via custom status
            context.SetCustomStatus(jobStatus);

            if (jobStatus.Status == "Completed")
            {
                return new JobMonitorResult
                {
                    JobId = jobId,
                    FinalStatus = "Completed",
                    ChecksPerformed = checkCount
                };
            }

            // Calculate next check time
            var nextCheck = context.CurrentUtcDateTime.Add(pollingInterval);
            if (nextCheck > expirationTime)
            {
                nextCheck = expirationTime;
            }

            // Wait until next polling interval
            await context.CreateTimer(nextCheck, default);
        }

        // Timeout reached
        return new JobMonitorResult
        {
            JobId = jobId,
            FinalStatus = "Timeout",
            ChecksPerformed = checkCount
        };
    }
}

Este orquestador realiza las siguientes acciones:

  1. Toma el identificador de trabajo, el intervalo de sondeo y el tiempo de espera como parámetros de entrada.
  2. Registra la hora de inicio y calcula la hora de expiración.
  3. Entra en un bucle de sondeo que comprueba el estado del proceso.
  4. Actualiza el estado personalizado para que los clientes puedan supervisar el progreso.
  5. Si el trabajo se completa, el sistema devuelve el resultado final.
  6. Si se alcanza el tiempo de espera, devuelve un estado de tiempo de espera.
  7. Usa CreateTimer para esperar entre intentos de sondeo sin consumir recursos.

Actividades

Al igual que otros ejemplos, las funciones auxiliares de actividad son básicamente funciones normales que usan el enlace del desencadenador activityTrigger.

Actividad de comprobación de estado

La función E3_GetIsClear obtiene las condiciones meteorológicas actuales mediante la API Weather Underground y determina si el cielo está claro.

[FunctionName("E3_GetIsClear")]
public static async Task<bool> GetIsClear([ActivityTrigger] Location location)
{
    var currentConditions = await WeatherUnderground.GetCurrentConditionsAsync(location);
    return currentConditions.Equals(WeatherCondition.Clear);
}

Actividad de alerta

La función E3_SendGoodWeatherAlert usa el enlace de Twilio para enviar un mensaje SMS que notifica al usuario final que es un buen momento para un paseo.

    [FunctionName("E3_SendGoodWeatherAlert")]
    public static void SendGoodWeatherAlert(
        [ActivityTrigger] string phoneNumber,
        ILogger log,
        [TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%TwilioPhoneNumber%")]
            out CreateMessageOptions message)
    {
        message = new CreateMessageOptions(new PhoneNumber(phoneNumber));
        message.Body = $"The weather's clear outside! Go take a walk!";
    }

internal class WeatherUnderground
{
    private static readonly HttpClient httpClient = new HttpClient();
    private static IReadOnlyDictionary<string, WeatherCondition> weatherMapping = new Dictionary<string, WeatherCondition>()
    {
        { "Clear", WeatherCondition.Clear },
        { "Overcast", WeatherCondition.Clear },
        { "Cloudy", WeatherCondition.Clear },
        { "Clouds", WeatherCondition.Clear },
        { "Drizzle", WeatherCondition.Precipitation },
        { "Hail", WeatherCondition.Precipitation },
        { "Ice", WeatherCondition.Precipitation },
        { "Mist", WeatherCondition.Precipitation },
        { "Precipitation", WeatherCondition.Precipitation },
        { "Rain", WeatherCondition.Precipitation },
        { "Showers", WeatherCondition.Precipitation },
        { "Snow", WeatherCondition.Precipitation },
        { "Spray", WeatherCondition.Precipitation },
        { "Squall", WeatherCondition.Precipitation },
        { "Thunderstorm", WeatherCondition.Precipitation },
    };

    internal static async Task<WeatherCondition> GetCurrentConditionsAsync(Location location)
    {
        var apiKey = Environment.GetEnvironmentVariable("WeatherUndergroundApiKey");
        if (string.IsNullOrEmpty(apiKey))
        {
            throw new InvalidOperationException("The WeatherUndergroundApiKey environment variable was not set.");
        }

        var callString = string.Format("http://api.wunderground.com/api/{0}/conditions/q/{1}/{2}.json", apiKey, location.State, location.City);
        var response = await httpClient.GetAsync(callString);
        var conditions = await response.Content.ReadAsAsync<JObject>();

        JToken currentObservation;
        if (!conditions.TryGetValue("current_observation", out currentObservation))
        {
            JToken error = conditions.SelectToken("response.error");

            if (error != null)
            {
                throw new InvalidOperationException($"API returned an error: {error}.");
            }
            else
            {
                throw new ArgumentException("Could not find weather for this location. Try being more specific.");
            }
        }

        return MapToWeatherCondition((string)(currentObservation as JObject).GetValue("weather"));
    }

    private static WeatherCondition MapToWeatherCondition(string weather)
    {
        foreach (var pair in weatherMapping)
        {
            if (weather.Contains(pair.Key))
            {
                return pair.Value;
            }
        }

        return WeatherCondition.Other;
    }
}

Nota:

Tendrá que instalar el paquete Nuget Microsoft.Azure.WebJobs.Extensions.Twilio para ejecutar el código de ejemplo.

La actividad comprueba el estado actual del trabajo. En una aplicación real, esto llamaría a una API o servicio externo.

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

[DurableTask(nameof(CheckJobStatusActivity))]
public class CheckJobStatusActivity : TaskActivity<CheckJobInput, JobStatus>
{
    private readonly ILogger<CheckJobStatusActivity> _logger;

    public CheckJobStatusActivity(ILogger<CheckJobStatusActivity> logger)
    {
        _logger = logger;
    }

    public override Task<JobStatus> RunAsync(TaskActivityContext context, CheckJobInput input)
    {
        _logger.LogInformation("Checking status for job: {JobId} (check #{CheckCount})",
            input.JobId, input.CheckCount + 1);

        // Simulate job status - completes after 3 checks
        var status = input.CheckCount >= 3 ? "Completed" : "Running";

        return Task.FromResult(new JobStatus
        {
            JobId = input.JobId,
            Status = status,
            CheckCount = input.CheckCount + 1,
            LastCheckTime = DateTime.UtcNow
        });
    }
}

// Data classes
public class JobMonitorInput
{
    public string JobId { get; set; }
    public int PollingIntervalSeconds { get; set; } = 5;
    public int TimeoutSeconds { get; set; } = 30;
}

public class CheckJobInput
{
    public string JobId { get; set; }
    public int CheckCount { get; set; }
}

public class JobStatus
{
    public string JobId { get; set; }
    public string Status { get; set; }
    public int CheckCount { get; set; }
    public DateTime LastCheckTime { get; set; }
}

public class JobMonitorResult
{
    public string JobId { get; set; }
    public string FinalStatus { get; set; }
    public int ChecksPerformed { get; set; }
}

Ejecutar el ejemplo de supervisión

Con las funciones desencadenadas mediante HTTP del ejemplo, puede iniciar la orquestación con el envío de la siguiente solicitud HTTP POST:

POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json

{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Location: https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={SystemKey}
RetryAfter: 10

{"id": "f6893f25acf64df2ab53a35c09d52635", "statusQueryGetUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "sendEventPostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/raiseEvent/{eventName}?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "terminatePostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason={text}&taskHub=SampleHubVS&connection=Storage&code={systemKey}"}

La instancia de E3_Monitor inicia y consulta las condiciones actuales. Si se cumple la condición, llama a una función de actividad para enviar una alerta; de lo contrario, establece un temporizador. Cuando expire el temporizador, se reanuda la orquestación.

Puede ver los resultados de la actividad de orquestación al examinar los registros de la función en el portal de Azure Functions.

La orquestación se completa al alcanzar su tiempo de espera o al detectar la condición. También puede usar la terminate API dentro de otra función o invocar el webhook terminatePostUri HTTP POST al que se hace referencia en la respuesta 202 anterior. Para usar el webhook, reemplace {text} por el motivo de la terminación temprana. La dirección URL de HTTP POST tiene un aspecto aproximado al siguiente:

POST https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason=Because&taskHub=SampleHubVS&connection=Storage&code={systemKey}

Para ejecutar el ejemplo, necesita lo siguiente:

  1. Inicie el emulador del Programador de tareas durables (para el desarrollo local):

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Inicie el proceso de trabajo para registrar el orquestador y las actividades.

  3. Ejecute el cliente para programar una orquestación de supervisión.

using System;
using System.Threading.Tasks;

var client = DurableTaskClientBuilder.UseDurableTaskScheduler(connectionString).Build();

// Schedule the monitoring orchestration
var input = new JobMonitorInput
{
    JobId = "job-" + Guid.NewGuid().ToString(),
    PollingIntervalSeconds = 5,
    TimeoutSeconds = 30
};

string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(MonitoringJobOrchestration), input);

Console.WriteLine($"Started monitoring orchestration: {instanceId}");

// Wait for completion while checking status
while (true)
{
    var state = await client.GetInstanceMetadataAsync(instanceId, getInputsAndOutputs: true);

    if (state.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
        state.RuntimeStatus == OrchestrationRuntimeStatus.Failed)
    {
        Console.WriteLine($"Monitoring completed: {state.ReadOutputAs<JobMonitorResult>().FinalStatus}");
        break;
    }

    Console.WriteLine($"Current status: {state.ReadCustomStatusAs<JobStatus>()?.Status}");
    await Task.Delay(2000);
}

Pasos siguientes

En este ejemplo se muestra cómo usar Durable Functions para supervisar el estado de un origen externo mediante temporizadoresdurables y lógica condicional. En el ejemplo siguiente se muestra cómo usar eventos externos y temporizadores duraderos para controlar la interacción humana.

En este ejemplo se muestra cómo usar los SDK de Durable Task para implementar el patrón de supervisión con temporizadores duraderos y seguimiento de estado. Obtenga más información sobre otros patrones y características.