Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Dit eenvoudige voorbeeld voor de activiteitscoördinator laat zien hoe de API kan worden gebruikt om een model op de achtergrond opnieuw te trainen wanneer aan systeemvoorwaarden wordt voldaan.
Voorbeeldprojectoverzicht
Laten we eens kijken naar het geval van een app voor het bewerken van muziek. Deze app heeft achtergrondtaken met hoge prioriteit die gebruikersaanvragen verwerken, zoals het publiceren van inhoud naar cloudopslag. Er zijn ook achtergrondtaken met lage prioriteit die ondersteuning bieden voor gebruikersinteractie, zoals het bieden van automatische aanbevelingen om een samenstelling te verbeteren tijdens het bewerken. Ten slotte zijn er een reeks uitgestelde taken die niet op een bepaald moment hoeven te worden uitgevoerd zonder de aanvraag van de gebruiker. Dit is onze focus in dit voorbeeld. In het bijzonder willen we het aanbevelingsmodel regelmatig opnieuw trainen wanneer de impact van de gebruiker minimaal is. We kunnen de API van de activiteitscoördinator gebruiken om dit te bereiken.
Voor dit scenario willen we het model opnieuw trainen wanneer de gebruiker niet aanwezig is. De werkstroom voor opnieuw trainen in dit scenario is ook een GPU-consument, dus we willen ook worden uitgevoerd wanneer het een goed moment is om de GPU te gebruiken. We kunnen deze vereisten opgeven met behulp van beleid voor activiteitencoördinatoren. De Activity Coordinator-API gebruikt ons beleid om te bepalen wanneer aan de vereisten wordt voldaan en om meldingen te verzenden voor wanneer we ons werk moeten starten of stoppen.
In dit geval voldoet de GOOD-beleidssjabloon aan de meeste behoeften, omdat hiermee CPU, geheugen, systeemschijf, energie en gebruikersinactiviteit worden bijgehouden. We hoeven alleen maar expliciet een voorwaarde voor GPU in te stellen. Het is belangrijk om te onthouden dat hoewel onze workload voornamelijk gebruikmaakt van de GPU, de uitvoering van onze activiteit nog steeds inherent CPU, geheugen, schijf en energie verbruikt. Onze impact op deze resources kan ook sterk verschillen tussen systeemconfiguraties. Een snellere GPU kan er bijvoorbeeld toe leiden dat de CPU meer tijd besteedt aan het invoeren van de GPU met gegevens, wat kan leiden tot meer gegevens die worden gelezen of opgeslagen op schijf. De snelheid van deze schijf kan ook van invloed zijn op het CPU-verbruik op een vergelijkbare manier. Door alle resources te configureren die we beïnvloeden, kunnen we er zeker van zijn dat we de gebruikerservaring niet per ongeluk verstoren of de systeemprestaties verminderen. Daarnaast is het werk zelf opgesplitst in kleine segmenten, zodat we adequaat kunnen reageren op coördinatiemeldingen om te voorkomen dat er buiten de gewenste omstandigheden wordt uitgevoerd.
Om te laten zien hoe ontwikkelaars beleidsregels kunnen wijzigen of downgraden, voegen we ook de vereiste toe die we binnen 48 uur opnieuw willen trainen. De eerste 24 uur, onze zachte deadline, proberen we te worden uitgevoerd met ons ideale beleid en de afgelopen 24 uur downgraden we naar een lager beleid.
Voorbeeldprojectcode
De volgende code is de voorbeeldtoepassing voor het bewerken van muziek. De API voor activiteitscoördinator wordt gebruikt om achtergrondtaken uit te voeren, zoals beschreven in het overzicht.
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>
// To use ActivityCoordinator, we must link to the OneCoreUAP library.
#pragma comment(lib, "OneCoreUAP.lib")
using namespace std;
using namespace chrono;
using namespace wil;
// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.
typedef wil::unique_any<
ACTIVITY_COORDINATOR_POLICY,
decltype(&DestroyActivityCoordinatorPolicy),
DestroyActivityCoordinatorPolicy>
unique_policy;
typedef wil::unique_any<
ACTIVITY_COORDINATOR_SUBSCRIPTION,
decltype(&UnsubscribeActivityCoordinatorPolicy),
UnsubscribeActivityCoordinatorPolicy>
unique_subscription;
struct WORKER_CONTEXT {
mutex ContextLock;
unique_threadpool_work Worker;
bool ShouldRun;
bool IsRunning;
bool IsComplete;
std::condition_variable CompletionSignal;
};
_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
_In_ WORKER_CONTEXT* workerContext
)
{
workerContext->ShouldRun = true;
if (!workerContext->IsRunning && !workerContext->IsComplete) {
// No active workers, so start a new one.
workerContext->IsRunning = true;
SubmitThreadpoolWork(workerContext->Worker.get());
}
}
void
DeferredWorkEventCallback(
_In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
_In_ void* callbackContext
)
{
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
// Use this callback thread to dispatch notifications to a worker thread
// about whether or not it should process the next chunk of deferred work.
// Note: Do not use this thread to perform your activity's workload.
lock_guard<mutex> scopedLock(workerContext->ContextLock);
switch (notificationType) {
case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:
// Allow deferred work to be processed.
ResumeWorker(workerContext);
break;
case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:
// Stop processing deferred work.
workerContext->ShouldRun = false;
break;
default:
FAIL_FAST();
break;
}
}
bool
TrainNextModelChunk(
)
{
//
// Returns true if all work is completed, or false if there is more work.
//
return false;
}
void
DeferredModelTrainingWorker(
_Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
_Inout_opt_ PVOID callbackContext,
_Inout_ PTP_WORK work
)
{
// Threadpool callback instance and work are not needed for this sample.
UNREFERENCED_PARAMETER(callbackInstance);
UNREFERENCED_PARAMETER(work);
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
bool workComplete = false;
// Keep processing work until being told to stop or all work has been completed.
while (true) {
{
lock_guard<mutex> scopedLock(workerContext->ContextLock);
if (workComplete) {
workerContext->IsComplete = true;
}
if (!workerContext->ShouldRun || workerContext->IsComplete) {
workerContext->IsRunning = false;
break;
}
}
// TrainNextModelChunk returns true when there is no more work to do.
workComplete = TrainNextModelChunk();
}
workerContext->CompletionSignal.notify_all();
}
int
__cdecl
wmain(
)
{
WORKER_CONTEXT workerContext;
workerContext.ShouldRun = false;
workerContext.IsRunning = false;
workerContext.IsComplete = false;
// Create the worker that will be started by our subscription callback.
workerContext.Worker.reset(CreateThreadpoolWork(
DeferredModelTrainingWorker,
&workerContext,
nullptr));
RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);
// Allocate a policy suited for tasks that are best run when unlikely
// to cause impact to the user or system performance.
unique_policy policy;
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
&policy));
// The model training in this sample consumes GPU.
// The GOOD policy template doesn't currently include the GPU resource. We
// therefore customize the policy to include good GPU conditions to minimize
// the impact of running our work.
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_GOOD));
// Subscribe to the policy for coordination notifications.
unique_subscription subscription;
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
// Destroy the policy because we no longer need it.
policy.reset();
// We want our task to complete within 48h, so we allocate 24h under our
// ideal policy and before falling back to a downgraded policy.
bool workerCompleted;
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
if (workerCompleted) {
// Since our work is complete, we should clean up our subscription by
// unsubscribing. This would normally be handled quietly by our RAII
// types, but we release them explicitly to demonstrate API flow for
// developers manually managing resources.
subscription.reset();
return S_OK;
}
// We passed our soft deadline, so downgrade the policy and wait the
// remaining 24h until our hard deadline has been reached. Since
// Subscriptions and policies are independent of each other, we need to
// create a new subscription with our downgraded policy to receive
// notifications based on its configuration.
//
// The downgraded policy uses medium conditions for all needed resources.
// This gives us the best chance to run while helping to prevent us from
// critically degrading the user experience, which we are more likely to do
// when falling back to manual execution.
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
&policy));
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_MEDIUM));
subscription.reset();
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
// We passed our deadline, so unsubscribe and manually resume our task.
subscription.reset();
ResumeWorker(&workerContext);
// We destroyed our subscription, so we wait indefinitely for completion as
// there's nothing to pause execution of our task.
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerContext.CompletionSignal.wait(
scopedLock,
[&workerContext] { return workerContext.IsComplete; });
return S_OK;
}
Verwante onderwerpen
Overzicht van Activity Coordinator-API
api en terminologie voor activiteitscoördinator