監視パターンは、条件が満たされるまで外部システムをポーリングするワークフローの繰り返しのプロセスです。たとえば、ジョブの状態が完了するまで確認したり、空が晴れるまで気象データを監視したりします。 固定スケジュール タイマー トリガーとは異なり、モニターはイテレーション間で待機し (重複を回避)、動的な間隔をサポートし、条件が満たされるかタイムアウトが切れると、それ自体を終了できます。
この記事では、永続的オーケストレーションを使用してモニター パターンを実装する方法について説明します。
Tip
この記事では、完全な実装について説明します。 持続的オーケストレーションのユース ケースの概念の概要については、「 Durable Task とは」を参照してください。
Durable Functionsの例には、気象監視シナリオ (C#/JavaScript) とGitHub問題の監視シナリオ (Python) が含まれます。
注
Azure Functions の Node.js プログラミング モデルのバージョン 4 が一般公開されています。 v4 モデルは、JavaScript および TypeScript 開発者に、より柔軟で直感的なエクスペリエンスを提供するように設計されています。 v3 と v4 の違いの詳細については、 移行ガイドを参照してください。
次のコード スニペットでは、JavaScript (PM4) は、新しいエクスペリエンスであるプログラミング モデル v4 を示しています。
Durable Task SDK の例では、.NET、JavaScript、Python、およびJavaを使用して、構成可能なポーリング間隔でジョブの状態を監視する方法を示します。
前提条件
このシナリオでは、Durable Functions用の PowerShell サンプルはまだ使用できません。
Durable FunctionsのJavaサンプルは、このシナリオではまだ使用できません。 Durable Task SDK のピボットを参照してください。
- .NET 8.0 SDK 以降
- Azure Durable Task Scheduler かローカル エミュレーターにアクセスする
- Node.js 22 以降
- Azure Durable Task Scheduler かローカル エミュレーターにアクセスする
- Python 3.9 以降
- Azure Durable Task Scheduler かローカル エミュレーターにアクセスする
このサンプルは、.NET、JavaScript、Java、およびPythonについて示されています。
- Java 11 以降
- Azure Durable Task Scheduler かローカル エミュレーターにアクセスする
監視シナリオの概要
このサンプルでは、ある場所の現在の気象条件を監視し、空が晴れたときに SMS でユーザーに通知します。 定期的にタイマーでトリガーされる関数を使って、天気を確認し、アラートを送信できます。 ただし、この方法の 1 つの問題は 、有効期間管理です。 送信する必要があるアラートが 1 つだけの場合は、晴天が検出された後でモニターを無効にする必要があります。
このサンプルでは、ある場所の現在の気象条件を監視し、空が晴れたときに SMS でユーザーに通知します。 定期的にタイマーでトリガーされる関数を使って、天気を確認し、アラートを送信できます。 ただし、この方法の 1 つの問題は 、有効期間管理です。 送信する必要があるアラートが 1 つだけの場合は、晴天が検出された後でモニターを無効にする必要があります。
このサンプルでは、GitHub リポジトリ内の問題の数を監視し、3 つ以上の未解決の問題がある場合にユーザーに警告します。 タイマーによって定期的にトリガーされる関数を使用して、オープンされている issue の数を一定の間隔で要求できます。 ただし、この方法の 1 つの問題は 、有効期間管理です。 アラートを 1 つだけ送信する必要がある場合は、3 つ以上の問題が検出された後、モニター自体を無効にする必要があります。
このシナリオでは、Durable Functions用の PowerShell サンプルはまだ使用できません。
Durable FunctionsのJavaサンプルは、このシナリオではまだ使用できません。 Durable Task SDK のピボットを参照してください。
特にメリットがあるのは、監視パターンがそれ自体の実行を終了できることです。
- モニターはスケジュールではなく間隔で実行されます。タイマー トリガーは 1 時間ごとに 実行されます 。モニターはアクションの間に 1 時間 待機します 。 モニターのアクションは、指定されない限りは重複しません。これは実行時間の長いタスクで重要となる可能性があります。
- モニターの間隔は動的にすることができます。待機時間は、いくつかの条件に基づいて変化することがあります。
- モニターは、ある条件が満たされたときに終了することも、別のプロセスによって終了することもできます。
- モニターは、パラメーターを受け取ることができます。 このサンプルでは、要求された場所、電話番号、またはリポジトリに同じ監視プロセスを適用する方法を示します。
- モニターはスケーラブルです。 各モニターはオーケストレーション インスタンスであるため、新しい関数を作成したり、コードをさらに定義したりしなくても、複数のモニターを作成できます。
- モニターは、より大規模なワークフローと簡単に統合できます。 モニターには、より複雑なオーケストレーション関数の 1 つのセクションまたは サブオーケストレーションを指定できます。
このサンプルでは、実行時間の長いジョブの状態を監視し、ジョブが完了またはタイムアウトしたときに最終的な結果を返します。通常のポーリング ループを使用してジョブの状態を確認できますが、この方法には 有効期間の管理 と 信頼性に関する制限があります。
監視パターンには、次の利点があります。
-
永続的なポーリング: オーケストレーションは、プロセスの再起動後も存続し、障害が発生した後も引き続き監視を継続できます。
-
構成可能な間隔: 状態チェック間の待機時間を動的に調整できます。
-
タイムアウトのサポート: モニターは、条件が満たされたとき、またはタイムアウトの有効期限が切れたときに終了できます。
-
状態の可視性: クライアントは、オーケストレーションのカスタム状態に対してクエリを実行して、現在の監視の進行状況を確認できます。
-
スケーラビリティ: 複数のモニターを同時に実行し、それぞれ異なるジョブを追跡できます。
コンフィギュレーション
Twilio 統合の構成
このサンプルでは、Twilio サービスを使って、携帯電話に SMS メッセージを送信します。 Azure Functionsは既に Twilio バインディングを介して Twilio をサポートしており、このサンプルではその機能を使用しています。
最初に必要なものは Twilio アカウントです。
https://www.twilio.com/try-twilio で無料で作成できます。 アカウントを作成したら、次の 3 つのアプリ設定を関数アプリに追加します。
| アプリ設定の名前 |
値の説明 |
|
TwilioAccountSid |
Twilio アカウントの SID |
|
TwilioAuthToken |
Twilio アカウントの認証トークン |
|
TwilioPhoneNumber |
Twilio アカウントに関連付けられている電話番号。 これは、SMS メッセージの送信に使われます。 |
Weather API の構成
C#/JavaScript サンプルでは、天気 API を呼び出して現在の状態を確認します。 独自の Weather API キーを指定し、それに応じてサンプル コードを更新する必要があります。 サンプル コードでは、 WeatherUndergroundApiKey アプリ設定を参照しています。これを、選択した気象プロバイダーのキーに置き換えます。
| アプリ設定の名前 |
値の説明 |
|
WeatherUndergroundApiKey |
Weather API キー (必要に応じてプロバイダーのキー名に置き換えます)。 |
Twilio 統合の構成
このサンプルでは、Twilio サービスを使って、携帯電話に SMS メッセージを送信します。 Azure Functionsは既に Twilio バインディングを介して Twilio をサポートしており、このサンプルではその機能を使用しています。
最初に必要なものは Twilio アカウントです。
https://www.twilio.com/try-twilio で無料で作成できます。 アカウントを作成したら、次の 3 つのアプリ設定を関数アプリに追加します。
| アプリ設定の名前 |
値の説明 |
|
TwilioAccountSid |
Twilio アカウントの SID |
|
TwilioAuthToken |
Twilio アカウントの認証トークン |
|
TwilioPhoneNumber |
Twilio アカウントに関連付けられている電話番号。 これは、SMS メッセージの送信に使われます。 |
Weather API の構成
C#/JavaScript サンプルでは、天気 API を呼び出して現在の状態を確認します。 独自の Weather API キーを指定し、それに応じてサンプル コードを更新する必要があります。 サンプル コードでは、 WeatherUndergroundApiKey アプリ設定を参照しています。これを、選択した気象プロバイダーのキーに置き換えます。
| アプリ設定の名前 |
値の説明 |
|
WeatherUndergroundApiKey |
あなたの天気APIキー(必要に応じて、プロバイダーのキー名に置き換えてください)。 |
Twilio 統合の構成
このサンプルでは、Twilio サービスを使って、携帯電話に SMS メッセージを送信します。 Azure Functionsは既に Twilio バインディングを介して Twilio をサポートしており、このサンプルではその機能を使用しています。
最初に必要なものは Twilio アカウントです。
https://www.twilio.com/try-twilio で無料で作成できます。 アカウントを作成したら、次の 3 つのアプリ設定を関数アプリに追加します。
| アプリ設定の名前 |
値の説明 |
|
TwilioAccountSid |
Twilio アカウントの SID |
|
TwilioAuthToken |
Twilio アカウントの認証トークン |
|
TwilioPhoneNumber |
Twilio アカウントに関連付けられている電話番号。 これは、SMS メッセージの送信に使われます。 |
このシナリオでは、Durable Functions用の PowerShell サンプルはまだ使用できません。
Durable FunctionsのJavaサンプルは、このシナリオではまだ使用できません。 Durable Task SDK のピボットを参照してください。
Orchestrator
[FunctionName("E3_Monitor")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext monitorContext, ILogger log)
{
MonitorRequest input = monitorContext.GetInput<MonitorRequest>();
if (!monitorContext.IsReplaying) { log.LogInformation($"Received monitor request. Location: {input?.Location}. Phone: {input?.Phone}."); }
VerifyRequest(input);
DateTime endTime = monitorContext.CurrentUtcDateTime.AddHours(6);
if (!monitorContext.IsReplaying) { log.LogInformation($"Instantiating monitor for {input.Location}. Expires: {endTime}."); }
while (monitorContext.CurrentUtcDateTime < endTime)
{
// Check the weather
if (!monitorContext.IsReplaying) { log.LogInformation($"Checking current weather conditions for {input.Location} at {monitorContext.CurrentUtcDateTime}."); }
bool isClear = await monitorContext.CallActivityAsync<bool>("E3_GetIsClear", input.Location);
if (isClear)
{
// It's not raining! Or snowing. Or misting. Tell our user to take advantage of it.
if (!monitorContext.IsReplaying) { log.LogInformation($"Detected clear weather for {input.Location}. Notifying {input.Phone}."); }
await monitorContext.CallActivityAsync("E3_SendGoodWeatherAlert", input.Phone);
break;
}
else
{
// Wait for the next checkpoint
var nextCheckpoint = monitorContext.CurrentUtcDateTime.AddMinutes(30);
if (!monitorContext.IsReplaying) { log.LogInformation($"Next check for {input.Location} at {nextCheckpoint}."); }
await monitorContext.CreateTimer(nextCheckpoint, CancellationToken.None);
}
}
log.LogInformation($"Monitor expiring.");
}
[Deterministic]
private static void VerifyRequest(MonitorRequest request)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request), "An input object is required.");
}
if (request.Location == null)
{
throw new ArgumentNullException(nameof(request.Location), "A location input is required.");
}
if (string.IsNullOrEmpty(request.Phone))
{
throw new ArgumentNullException(nameof(request.Phone), "A phone number input is required.");
}
}
オーケストレーター関数には、監視する場所と、その場所で天気が明らかになったときにメッセージを送信する電話番号が必要です。 このデータは、厳密に型指定された MonitorRequest オブジェクトとしてオーケストレーター関数に渡されます。
E3_Monitor関数は、オーケストレーター関数に標準のfunction.json を使用します。
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
この関数を実装するコードを次に示します。
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function* (context) {
const input = context.df.getInput();
context.log(
"Received monitor request. location: " +
(input ? input.location : undefined) +
". phone: " +
(input ? input.phone : undefined) +
"."
);
verifyRequest(input);
const endTime = moment.utc(context.df.currentUtcDateTime).add(6, "h");
context.log(
"Instantiating monitor for " +
input.location.city +
", " +
input.location.state +
". Expires: " +
endTime +
"."
);
while (moment.utc(context.df.currentUtcDateTime).isBefore(endTime)) {
// Check the weather
context.log(
"Checking current weather conditions for " +
input.location.city +
", " +
input.location.state +
" at " +
context.df.currentUtcDateTime +
"."
);
const isClear = yield context.df.callActivity("E3_GetIsClear", input.location);
if (isClear) {
// It's not raining! Or snowing. Or misting. Tell our user to take advantage of it.
context.log(
"Detected clear weather for " +
input.location.city +
", " +
input.location.state +
". Notifying " +
input.phone +
"."
);
yield context.df.callActivity("E3_SendGoodWeatherAlert", input.phone);
break;
} else {
// Wait for the next checkpoint
const nextCheckpoint = moment.utc(context.df.currentUtcDateTime).add(30, "s");
context.log(
"Next check for " +
input.location.city +
", " +
input.location.state +
" at " +
nextCheckpoint.toString()
);
yield context.df.createTimer(nextCheckpoint.toDate()); // accomodate cancellation tokens
}
}
context.log("Monitor expiring.");
});
function verifyRequest(request) {
if (!request) {
throw new Error("An input object is required.");
}
if (!request.location) {
throw new Error("A location input is required.");
}
if (!request.phone) {
throw new Error("A phone number input is required.");
}
}
E3_Monitor関数は、オーケストレーター関数に標準のfunction.json を使用します。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
関数を実装するコードを次に示します。
import azure.durable_functions as df
from datetime import timedelta
from typing import Dict
def orchestrator_function(context: df.DurableOrchestrationContext):
monitoring_request: Dict[str, str] = context.get_input()
repo_url: str = monitoring_request["repo"]
phone: str = monitoring_request["phone"]
# Expiration of the repo monitoring
expiry_time = context.current_utc_datetime + timedelta(minutes=5)
while context.current_utc_datetime < expiry_time:
# Count the number of issues in the repo (the GitHub API caps at 30 issues per page)
too_many_issues = yield context.call_activity("E3_TooManyOpenIssues", repo_url)
# If we detect too many issues, we text the provided phone number
if too_many_issues:
# Extract URLs of GitHub issues, and return them
yield context.call_activity("E3_SendAlert", phone)
break
else:
# Reporting the number of statuses found
status = f"The repository does not have too many issues, for now ..."
context.set_custom_status(status)
# Schedule a new "wake up" signal
next_check = context.current_utc_datetime + timedelta(minutes=1)
yield context.create_timer(next_check)
return "Monitor completed!"
main = df.Orchestrator.create(orchestrator_function)
このサンプルでは使用できません。 Javaカバレッジについては、Durable Task SDK ピボットを参照してください。
@FunctionName("E3_Monitor")
public void monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
MonitorRequest input = ctx.getInput(MonitorRequest.class);
Instant expirationTime = ctx.getCurrentInstant().plus(Duration.ofHours(6));
int pollingInterval = input.getPollingIntervalSeconds();
while (ctx.getCurrentInstant().isBefore(expirationTime)) {
// Check current conditions
boolean isClear = ctx.callActivity(
"E3_GetIsClear", input.getLocation(), boolean.class).await();
if (isClear) {
// Condition met - send alert and exit
ctx.callActivity("E3_SendGoodWeatherAlert", input.getPhone()).await();
break;
}
// Wait for the next polling interval
Instant nextCheck = ctx.getCurrentInstant().plus(
Duration.ofSeconds(pollingInterval));
ctx.createTimer(nextCheck).await();
}
}
このオーケストレーター関数は、次のアクションを行います。
- 以下の構成からなるMonitorRequestを取得します: 監視対象のlocationおよびSMS通知を送信する電話番号(またはPythonの例ではrepo)。
- 監視の有効期限を決定します。 サンプルでは、簡略化のためにハード コーディングされた値を使います。
- 状態チェック アクティビティを呼び出して、条件が満たされているかどうかを判断します。
- 条件が満たされた場合は、アラート アクティビティを呼び出して通知を送信します。
- 持続的タイマーを作成して、次のポーリング間隔でオーケストレーションを再開します。 サンプルでは、簡略化のためにハード コーディングされた値を使います。
- 現在の UTC 時間がモニターの有効期限を過ぎるか、アラートが送信されるまで実行を続行します。
オーケストレーター関数インスタンスは、オーケストレーター関数を複数回呼び出すことによって同時に実行できます。 監視する場所と、アラートを送信する電話番号を指定できます。 オーケストレーター関数はタイマーの待機中に実行されていないため、課金されません。
オーケストレーターは、ジョブの状態を定期的にチェックし、ジョブが完了またはタイムアウトしたときに戻ります。
using Microsoft.DurableTask;
using System;
using System.Threading.Tasks;
[DurableTask(nameof(MonitoringJobOrchestration))]
public class MonitoringJobOrchestration : TaskOrchestrator<JobMonitorInput, JobMonitorResult>
{
public override async Task<JobMonitorResult> RunAsync(
TaskOrchestrationContext context, JobMonitorInput input)
{
var jobId = input.JobId;
var pollingInterval = TimeSpan.FromSeconds(input.PollingIntervalSeconds);
var expirationTime = context.CurrentUtcDateTime.AddSeconds(input.TimeoutSeconds);
// Initialize monitoring state
int checkCount = 0;
while (context.CurrentUtcDateTime < expirationTime)
{
// Check current job status
var jobStatus = await context.CallActivityAsync<JobStatus>(
nameof(CheckJobStatusActivity),
new CheckJobInput { JobId = jobId, CheckCount = checkCount });
checkCount = jobStatus.CheckCount;
// Make job status available via custom status
context.SetCustomStatus(jobStatus);
if (jobStatus.Status == "Completed")
{
return new JobMonitorResult
{
JobId = jobId,
FinalStatus = "Completed",
ChecksPerformed = checkCount
};
}
// Calculate next check time
var nextCheck = context.CurrentUtcDateTime.Add(pollingInterval);
if (nextCheck > expirationTime)
{
nextCheck = expirationTime;
}
// Wait until next polling interval
await context.CreateTimer(nextCheck, default);
}
// Timeout reached
return new JobMonitorResult
{
JobId = jobId,
FinalStatus = "Timeout",
ChecksPerformed = checkCount
};
}
}
import {
OrchestrationContext,
TOrchestrator,
} from "@microsoft/durabletask-js";
const monitorOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
input: { jobId: string; pollingIntervalSeconds: number; timeoutSeconds: number }
): any {
const { jobId, pollingIntervalSeconds, timeoutSeconds } = input;
const expirationTime = new Date(
ctx.currentUtcDateTime.getTime() + timeoutSeconds * 1000
);
let checkCount = 0;
while (ctx.currentUtcDateTime < expirationTime) {
// Check current job status
const jobStatus: any = yield ctx.callActivity(checkJobStatus, {
jobId,
checkCount,
});
checkCount = jobStatus.checkCount;
// Make job status available via custom status
ctx.setCustomStatus(jobStatus);
if (jobStatus.status === "Completed") {
return {
jobId,
finalStatus: "Completed",
checksPerformed: checkCount,
};
}
// Wait for next polling interval
yield ctx.createTimer(pollingIntervalSeconds);
}
// Timeout reached
return {
jobId,
finalStatus: "Timeout",
checksPerformed: checkCount,
};
};
import datetime
from durabletask import task
def monitoring_job_orchestrator(ctx: task.OrchestrationContext, job_data: dict) -> dict:
"""
Orchestrator that demonstrates the monitoring pattern.
Periodically checks the status of a job until it completes or times out.
"""
job_id = job_data.get("job_id")
polling_interval = job_data.get("polling_interval_seconds", 5)
timeout = job_data.get("timeout_seconds", 30)
# Record the start time
start_time = ctx.current_utc_datetime
expiration_time = start_time + datetime.timedelta(seconds=timeout)
# Initialize monitoring state
job_status = {
"job_id": job_id,
"status": "Unknown",
"check_count": 0
}
# Loop until the job completes or times out
while True:
# Check current job status
check_input = {"job_id": job_id, "check_count": job_status.get("check_count", 0)}
job_status = yield ctx.call_activity("check_job_status", input=check_input)
# Make the job status available via custom status
ctx.set_custom_status(job_status)
if job_status["status"] == "Completed":
break
# Check if we've hit the timeout
current_time = ctx.current_utc_datetime
if current_time >= expiration_time:
job_status["status"] = "Timeout"
break
# Calculate next check time
next_check_time = current_time + datetime.timedelta(seconds=polling_interval)
if next_check_time > expiration_time:
next_check_time = expiration_time
# Wait until next polling interval
yield ctx.create_timer(next_check_time)
# Return the final status
return {
"job_id": job_id,
"final_status": job_status["status"],
"checks_performed": job_status["check_count"]
}
このサンプルは、.NET、JavaScript、Java、およびPythonについて示されています。
import com.microsoft.durabletask.*;
import com.microsoft.durabletask.azuremanaged.DurableTaskSchedulerWorkerExtensions;
import java.time.Duration;
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "MonitoringJobOrchestrator"; }
@Override
public TaskOrchestration create() {
return ctx -> {
JobData jobData = ctx.getInput(JobData.class);
int pollingCount = 0;
// Set initial status
ctx.setCustomStatus(new JobStatus("Starting monitoring..."));
while (true) {
// Update status
ctx.setCustomStatus(new JobStatus(
"Polling job status (attempt " + (++pollingCount) + ")"));
// Wait for polling interval
ctx.createTimer(Duration.ofSeconds(jobData.pollingIntervalSeconds)).await();
// Check if job is complete (simulated after 3 attempts)
if (pollingCount >= 3) {
ctx.setCustomStatus(new JobStatus("Job completed successfully"));
ctx.complete(new JobResult(
"COMPLETED",
"Job completed after " + pollingCount + " attempts"));
break;
}
}
};
}
})
.build();
このオーケストレーターは、次のアクションを実行します。
- ジョブ ID、ポーリング間隔、タイムアウトを入力パラメーターとして受け取ります。
- 開始時刻を記録し、有効期限を計算します。
- ジョブの状態をチェックするポーリング ループに入ります。
- クライアントが進行状況を監視できるように、カスタム状態を更新します。
- ジョブが完了すると、最終的な結果が返されます。
- タイムアウトに達した場合は、タイムアウト状態を返します。
-
CreateTimerを使用して、ポーリング試行の間隔でリソースを消費せずに待機します。
活動
他のサンプルと同様に、ヘルパー アクティビティ関数は、 activityTrigger トリガー バインドを使用する通常の関数です。
状態確認活動
E3_GetIsClear関数は、Weather Underground API を使用して現在の気象条件を取得し、空が晴れているかどうかを判断します。
[FunctionName("E3_GetIsClear")]
public static async Task<bool> GetIsClear([ActivityTrigger] Location location)
{
var currentConditions = await WeatherUnderground.GetCurrentConditionsAsync(location);
return currentConditions.Equals(WeatherCondition.Clear);
}
function.json の定義は次のようになります。
{
"bindings": [
{
"name": "location",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
その実装を次に示します。
const request = require("request");
const clearWeatherConditions = [
"Overcast",
"Clear",
"Partly Cloudy",
"Mostly Cloudy",
"Scattered Clouds",
];
module.exports = function (context, location) {
getCurrentConditions(location)
.then(function (data) {
const isClear = clearWeatherConditions.includes(data.weather);
context.done(null, isClear);
})
.catch(function (err) {
context.log(`E3_GetIsClear encountered an error: ${err}`);
context.done(err);
});
};
function getCurrentConditions(location) {
return new Promise(function (resolve, reject) {
const options = {
url: `https://api.wunderground.com/api/${process.env["WeatherUndergroundApiKey"]}/conditions/q/${location.state}/${location.city}.json`,
method: "GET",
json: true,
};
request(options, function (err, res, body) {
if (err) {
reject(err);
}
if (body.error) {
reject(body.error);
}
if (body.response.error) {
reject(body.response.error);
}
resolve(body.current_observation);
});
});
}
E3_TooManyOpenIssues関数は、リポジトリで現在開かれている問題の一覧を取得し、サンプルに従って 3 つ以上の "多すぎる" 問題があるかどうかを判断します。
function.json の定義は次のようになります。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "repoID",
"type": "activityTrigger",
"direction": "in"
}
]
}
実装は次のとおりです。
import requests
import json
def main(repoID: str) -> str:
# We use the GitHub API to count the number of open issues in the repo provided
# Note that the GitHub API only displays at most 30 issues per response, so
# the maximum number this activity will return is 30. That's enough for demo'ing purposes.
[user, repo] = repoID.split("/")
url = f"https://api.github.com/repos/{user}/{repo}/issues?state=open"
res = requests.get(url)
if res.status_code != 200:
error_message = f"Could not find repo {user} under {repo}! API endpoint hit was: {url}"
raise Exception(error_message)
issues = json.loads(res.text)
too_many_issues: bool = len(issues) >= 3
return too_many_issues
このサンプルでは使用できません。 Javaカバレッジについては、Durable Task SDKs ピボットを参照してください。
E3_GetIsClear関数は、weather API を使用して、現在の気象条件の場所を確認します。
@FunctionName("E3_GetIsClear")
public boolean getIsClear(
@DurableActivityTrigger(name = "location") Location location) {
// Call weather API to check current conditions
String conditions = getWeatherConditions(location);
return conditions.equals("Clear");
}
アラート アクティビティ
E3_SendGoodWeatherAlert関数は、Twilio バインディングを使用して、歩いて良い時間であることをエンド ユーザーに通知する SMS メッセージを送信します。
[FunctionName("E3_SendGoodWeatherAlert")]
public static void SendGoodWeatherAlert(
[ActivityTrigger] string phoneNumber,
ILogger log,
[TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%TwilioPhoneNumber%")]
out CreateMessageOptions message)
{
message = new CreateMessageOptions(new PhoneNumber(phoneNumber));
message.Body = $"The weather's clear outside! Go take a walk!";
}
internal class WeatherUnderground
{
private static readonly HttpClient httpClient = new HttpClient();
private static IReadOnlyDictionary<string, WeatherCondition> weatherMapping = new Dictionary<string, WeatherCondition>()
{
{ "Clear", WeatherCondition.Clear },
{ "Overcast", WeatherCondition.Clear },
{ "Cloudy", WeatherCondition.Clear },
{ "Clouds", WeatherCondition.Clear },
{ "Drizzle", WeatherCondition.Precipitation },
{ "Hail", WeatherCondition.Precipitation },
{ "Ice", WeatherCondition.Precipitation },
{ "Mist", WeatherCondition.Precipitation },
{ "Precipitation", WeatherCondition.Precipitation },
{ "Rain", WeatherCondition.Precipitation },
{ "Showers", WeatherCondition.Precipitation },
{ "Snow", WeatherCondition.Precipitation },
{ "Spray", WeatherCondition.Precipitation },
{ "Squall", WeatherCondition.Precipitation },
{ "Thunderstorm", WeatherCondition.Precipitation },
};
internal static async Task<WeatherCondition> GetCurrentConditionsAsync(Location location)
{
var apiKey = Environment.GetEnvironmentVariable("WeatherUndergroundApiKey");
if (string.IsNullOrEmpty(apiKey))
{
throw new InvalidOperationException("The WeatherUndergroundApiKey environment variable was not set.");
}
var callString = string.Format("http://api.wunderground.com/api/{0}/conditions/q/{1}/{2}.json", apiKey, location.State, location.City);
var response = await httpClient.GetAsync(callString);
var conditions = await response.Content.ReadAsAsync<JObject>();
JToken currentObservation;
if (!conditions.TryGetValue("current_observation", out currentObservation))
{
JToken error = conditions.SelectToken("response.error");
if (error != null)
{
throw new InvalidOperationException($"API returned an error: {error}.");
}
else
{
throw new ArgumentException("Could not find weather for this location. Try being more specific.");
}
}
return MapToWeatherCondition((string)(currentObservation as JObject).GetValue("weather"));
}
private static WeatherCondition MapToWeatherCondition(string weather)
{
foreach (var pair in weatherMapping)
{
if (weather.Contains(pair.Key))
{
return pair.Value;
}
}
return WeatherCondition.Other;
}
}
注
サンプル コードを実行するには、Microsoft.Azure.WebJobs.Extensions.Twilio Nuget パッケージをインストールする必要があります。
その function.json は簡単です。
{
"bindings": [
{
"name": "phoneNumber",
"type": "activityTrigger",
"direction": "in"
},
{
"type": "twilioSms",
"name": "message",
"from": "%TwilioPhoneNumber%",
"accountSidSetting": "TwilioAccountSid",
"authTokenSetting": "TwilioAuthToken",
"direction": "out"
}
],
"disabled": false
}
SMS メッセージを送信するコードを次に示します。
module.exports = function (context, phoneNumber) {
context.bindings.message = {
body: `The weather's clear outside! Go take a walk!`,
to: phoneNumber,
};
context.done();
};
E3_SendAlert関数は、Twilio バインディングを使用して、解決を待っている未解決の問題が少なくとも 3 つあることをエンド ユーザーに通知する SMS メッセージを送信します。
その function.json は簡単です。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "repoID",
"type": "activityTrigger",
"direction": "in"
}
]
}
SMS メッセージを送信するコードを次に示します。
import json
import random
random.seed(10)
def main(phoneNumber: str, message):
payload = {
"body": f"Hey! You may want to check on your repo, there are too many open issues",
"to": phoneNumber
}
message.set(json.dumps(payload))
return "Message sent!"
このサンプルでは使用できません。 Javaカバレッジについては、Durable Task SDK ピボットを参照してください。
E3_SendGoodWeatherAlert関数は、SMS 通知をユーザーに送信します。
@FunctionName("E3_SendGoodWeatherAlert")
public void sendGoodWeatherAlert(
@DurableActivityTrigger(name = "phoneNumber") String phoneNumber) {
// Send an SMS alert using your preferred messaging service
sendSms(phoneNumber, "The weather is clear outside! Enjoy your day.");
}
このアクティビティは、ジョブの現在の状態を確認します。 実際のアプリケーションでは、外部 API またはサービスが呼び出されます。
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
[DurableTask(nameof(CheckJobStatusActivity))]
public class CheckJobStatusActivity : TaskActivity<CheckJobInput, JobStatus>
{
private readonly ILogger<CheckJobStatusActivity> _logger;
public CheckJobStatusActivity(ILogger<CheckJobStatusActivity> logger)
{
_logger = logger;
}
public override Task<JobStatus> RunAsync(TaskActivityContext context, CheckJobInput input)
{
_logger.LogInformation("Checking status for job: {JobId} (check #{CheckCount})",
input.JobId, input.CheckCount + 1);
// Simulate job status - completes after 3 checks
var status = input.CheckCount >= 3 ? "Completed" : "Running";
return Task.FromResult(new JobStatus
{
JobId = input.JobId,
Status = status,
CheckCount = input.CheckCount + 1,
LastCheckTime = DateTime.UtcNow
});
}
}
// Data classes
public class JobMonitorInput
{
public string JobId { get; set; }
public int PollingIntervalSeconds { get; set; } = 5;
public int TimeoutSeconds { get; set; } = 30;
}
public class CheckJobInput
{
public string JobId { get; set; }
public int CheckCount { get; set; }
}
public class JobStatus
{
public string JobId { get; set; }
public string Status { get; set; }
public int CheckCount { get; set; }
public DateTime LastCheckTime { get; set; }
}
public class JobMonitorResult
{
public string JobId { get; set; }
public string FinalStatus { get; set; }
public int ChecksPerformed { get; set; }
}
import { ActivityContext } from "@microsoft/durabletask-js";
const checkJobStatus = async (
_ctx: ActivityContext,
input: { jobId: string; checkCount: number }
): Promise<any> => {
console.log(
`Checking status for job: ${input.jobId} (check #${input.checkCount + 1})`
);
// Simulate job status — completes after 3 checks
const status = input.checkCount >= 3 ? "Completed" : "Running";
return {
jobId: input.jobId,
status,
checkCount: input.checkCount + 1,
lastCheckTime: new Date().toISOString(),
};
};
import datetime
from durabletask import task
def check_job_status(ctx: task.ActivityContext, job_data: dict) -> dict:
"""
Activity that checks the status of a long-running job.
In a real application, this would call an external API or service.
"""
job_id = job_data.get("job_id", "unknown")
check_count = job_data.get("check_count", 0)
# Simulate job status - completes after 3 checks
if check_count >= 3:
status = "Completed"
else:
status = "Running"
return {
"job_id": job_id,
"status": status,
"check_count": check_count + 1,
"last_check_time": datetime.datetime.now().isoformat()
}
このサンプルは、.NET、JavaScript、Java、およびPythonについて示されています。
Javaサンプルでは、オーケストレーターで状態チェックが直接シミュレートされます。 実際のアプリケーションでは、別のアクティビティを作成します。
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CheckJobStatus"; }
@Override
public TaskActivity create() {
return ctx -> {
JobCheckInput input = ctx.getInput(JobCheckInput.class);
// Simulate checking job status
// In a real app, this would call an external API
String status = input.checkCount >= 3 ? "Completed" : "Running";
return new JobStatus(status, input.checkCount + 1);
};
}
})
モニターのサンプルを実行する
サンプルに含まれる HTTP によってトリガーされる関数を使って、次の HTTP POST 要求を送信することによりオーケストレーションを開始できます。
POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json
{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
サンプルに含まれる HTTP によってトリガーされる関数を使って、次の HTTP POST 要求を送信することによりオーケストレーションを開始できます。
POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json
{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
GitHub アカウントが必要です。 問題を開くことができる一時パブリック リポジトリを作成します。
サンプルに含まれる HTTP によってトリガーされる関数を使って、次の HTTP POST 要求を送信することによりオーケストレーションを開始できます。
POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json
{ "repo": "<your GitHub handle>/<a new GitHub repo under your user>", "phone": "+1425XXXXXXX" }
たとえば、GitHubユーザー名が foo で、リポジトリが bar の場合、"repo" の値は "foo/bar" にする必要があります。
このサンプルでは使用できません。 Javaカバレッジについては、Durable Task SDK ピボットを参照してください。
サンプルに含まれている HTTP によってトリガーされる関数を使用して、次の HTTP POST 要求を送信することでオーケストレーションを開始できます。
POST https://{host}/api/StartMonitor
Content-Type: application/json
{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
HTTP トリガー関数は、オーケストレーションをスケジュールします。
@FunctionName("StartMonitor")
public HttpResponseMessage startMonitor(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<Optional<String>> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("E3_Monitor", req.getBody().get());
context.getLogger().info("Started monitor orchestration with ID = " + instanceId);
return durableContext.createCheckStatusResponse(req, instanceId);
}
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Location: https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={SystemKey}
RetryAfter: 10
{"id": "f6893f25acf64df2ab53a35c09d52635", "statusQueryGetUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "sendEventPostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/raiseEvent/{eventName}?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "terminatePostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason={text}&taskHub=SampleHubVS&connection=Storage&code={systemKey}"}
E3_Monitor インスタンスが起動し、現在の条件に対してクエリを実行します。 条件が満たされると、アクティビティ関数を呼び出してアラートを送信します。それ以外の場合は、タイマーを設定します。 タイマーが切れると、オーケストレーションが再開されます。
オーケストレーションのアクティビティは、Azure Functions ポータルで関数ログを確認することで確認できます。
オーケストレーションは、自らのタイムアウトに達したとき、または条件が検出されたときに完了します。 また、別の関数内で terminate API を使用したり、前の 202 応答で参照されている terminatePostUri HTTP POST webhook を呼び出すこともできます。 webhook を使用するには、 {text} を早期終了の理由に置き換えます。 HTTP POST URL は、大体次のようなものになります。
POST https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason=Because&taskHub=SampleHubVS&connection=Storage&code={systemKey}
サンプルを実行するには、次のものが必要です。
Durable Task Scheduler エミュレーターを起動 します (ローカル開発用)。
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
ワーカーを起動 してオーケストレーターとアクティビティを登録します。
クライアントを実行 して監視オーケストレーションをスケジュールします。
using System;
using System.Threading.Tasks;
var client = DurableTaskClientBuilder.UseDurableTaskScheduler(connectionString).Build();
// Schedule the monitoring orchestration
var input = new JobMonitorInput
{
JobId = "job-" + Guid.NewGuid().ToString(),
PollingIntervalSeconds = 5,
TimeoutSeconds = 30
};
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(MonitoringJobOrchestration), input);
Console.WriteLine($"Started monitoring orchestration: {instanceId}");
// Wait for completion while checking status
while (true)
{
var state = await client.GetInstanceMetadataAsync(instanceId, getInputsAndOutputs: true);
if (state.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
state.RuntimeStatus == OrchestrationRuntimeStatus.Failed)
{
Console.WriteLine($"Monitoring completed: {state.ReadOutputAs<JobMonitorResult>().FinalStatus}");
break;
}
Console.WriteLine($"Current status: {state.ReadCustomStatusAs<JobStatus>()?.Status}");
await Task.Delay(2000);
}
import {
DurableTaskAzureManagedClientBuilder,
DurableTaskAzureManagedWorkerBuilder,
} from "@microsoft/durabletask-js-azuremanaged";
const client = new DurableTaskAzureManagedClientBuilder()
.connectionString(connectionString)
.build();
const worker = new DurableTaskAzureManagedWorkerBuilder()
.connectionString(connectionString)
.addOrchestrator(monitorOrchestrator)
.addActivity(checkJobStatus)
.build();
await worker.start();
// Schedule the monitoring orchestration
const input = {
jobId: `job-${Date.now()}`,
pollingIntervalSeconds: 5,
timeoutSeconds: 30,
};
const instanceId = await client.scheduleNewOrchestration(
monitorOrchestrator,
input
);
console.log(`Started monitoring orchestration: ${instanceId}`);
// Wait for completion
const result = await client.waitForOrchestrationCompletion(
instanceId,
true,
60
);
console.log(`Final result: ${result?.serializedOutput}`);
await worker.stop();
await client.stop();
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
import time
client = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub,
token_credential=credential
)
# Schedule the monitoring orchestration
job_data = {
"job_id": "job-123",
"polling_interval_seconds": 5,
"timeout_seconds": 30
}
instance_id = client.schedule_new_orchestration(
monitoring_job_orchestrator,
input=job_data
)
print(f"Started monitoring orchestration: {instance_id}")
# Wait for completion
result = client.wait_for_orchestration_completion(instance_id, timeout=60)
print(f"Final result: {result.serialized_output}")
このサンプルは、.NET、JavaScript、Java、およびPythonについて示されています。
import java.time.Duration;
import java.util.UUID;
DurableTaskClient client = DurableTaskSchedulerClientExtensions
.createClientBuilder(connectionString).build();
// Schedule the monitoring orchestration
JobData jobData = new JobData(
"job-" + UUID.randomUUID().toString(),
5, // polling interval seconds
30 // timeout seconds
);
String instanceId = client.scheduleNewOrchestrationInstance(
"MonitoringJobOrchestrator",
new NewOrchestrationInstanceOptions().setInput(jobData));
System.out.println("Started monitoring orchestration: " + instanceId);
// Wait for completion
OrchestrationMetadata result = client.waitForInstanceCompletion(
instanceId, Duration.ofSeconds(60), true);
System.out.println("Final result: " + result.readOutputAs(JobResult.class).status);
次のステップ
このサンプルでは、Durable Functionsを使用して、保証可能タイマーと条件付きロジックを使用して外部ソースの状態を監視する方法を示します。 次のサンプルでは、外部イベントと 永続的タイマー を使用して人間の相互作用を処理する方法を示します。
このサンプルでは、Durable Task SDK を使用して、永続的タイマーと状態追跡を使用して監視パターンを実装する方法を示しました。 その他のパターンと機能の詳細を確認します。