SynkroniseringContext- och konsolappar

Användargränssnittsramverk som Windows Forms, WPF och .NET MAUI installerar en SynchronizationContext i användargränssnittstråden. När du await har en uppgift i dessa miljöer skickas fortsättningen automatiskt tillbaka till användargränssnittstråden. Konsolappar installerar inte en SynchronizationContext, vilket innebär att await fortsättningar körs på trådpoolen. Den här artikeln förklarar konsekvenserna och visar hur du skapar en entrådad meddelandepump när du behöver en.

Standardbeteende i en konsolapp

I en konsolapplikation SynchronizationContext.Current returnerar null. När en metod ger vid en awaitkörs fortsättningen på den trådpoolstråd som är tillgänglig:

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Representativa utdata från att köra det här programmet:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

Tråd 1 (huvudtråden) visas bara en gång under den första synkrona iterationen innan await Task.Yield() metoden pausas. Alla efterföljande iterationer körs på trådpooltrådar.

Moderna asynkrona startpunkter

Från och med C# 7.1 kan du deklarera Main som async Task eller async Task<int>. I C# 9 och senare kan du använda toppnivåinstruktioner direkt await :

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Dessa startpunkter installerar inte en SynchronizationContext. Körtiden genererar en bootstrap som anropar din asynkrona metod och väntar på det returnerade Task, liknande att anropa .GetAwaiter().GetResult(). Fortsättningar fortsätter att köras i trådpoolen.

När du behöver trådanknytning

För många konsolapplikationer går det bra att köra fortsättningar på trådpoolen. Vissa scenarier kräver dock att alla fortsättningar körs på en enda tråd:

  • Serialiserad körning: Flera konkurrerande asynkrona operationer delar tillstånd utan lås genom att köra fortsättningarna på samma tråd.
  • Bibliotekskrav: Vissa bibliotek eller COM-objekt kräver tillhörighet till en viss tråd.
  • Enhetstestning: Testramverk kan behöva deterministisk, enkeltrådad körning av asynkron kod.

Skapa en enkeltrådad SynchroniseringContext

Om du vill köra alla fortsättningar på en tråd behöver du två saker:

  1. En SynchronizationContext vars Post metodköer fungerar till en trådsäker samling.
  2. En meddelandepumpsloop som bearbetar den kö som finns i måltråden.

Den anpassade kontexten

Kontexten använder en BlockingCollection<T> för att samordna producenterna (de asynkrona fortsättningarna) och konsumenten (pumpningsloopen):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

AsyncPump.Run-metoden

AsyncPump.Run installerar den anpassade kontexten, anropar async-metoden och pumpar fortsättningar på den anropande tråden tills metoden har slutförts:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Se den i praktiken

Ersätt standardanropet med AsyncPump.Run:

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Resultat:

[1, 10000]

Det specifika tråd-ID:t kan variera beroende på körning och plattform, men det viktigaste resultatet är att alla 10 000 iterationer körs på en enda tråd: huvudtråden.

Hantera asynkrona void-metoder

Överbelastningen Func<Task> spårar slutförandet via det returnerade Task. Asynkrona void metoder returnerar inte en uppgift. I stället meddelar de den aktuella SynchronizationContext via OperationStarted() och OperationCompleted(). För att stödja asynkrona void metoder, utöka kontexten för att spåra utestående operationer.

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

När driftspårningen är aktiverad avslutas pumpen endast när alla enastående asynkrona void metoder har slutförts, inte bara uppgiften på den översta nivån.

Praktiska överväganden

  • Dödlägesrisk: Om koden körs inuti AsyncPump.Run block synkront (till exempel genom att anropa .Result eller .Wait() på en uppgift vars fortsättning måste skickas tillbaka till pumpen), kan pumptråden inte bearbeta den fortsättningen. Resultatet är ett dödläge. Samma problem beskrivs i Synkrona omslutningar för asynkrona metoder.
  • Prestanda: En entrådad pump begränsar genomströmningen till en tråd. Använd den här metoden endast när trådanknytning är avgörande.
  • Plattformsoberoende: Implementeringen som visas här använder endast typer från namnrymderna System.Collections.Concurrent och System.Threading. Det fungerar på alla plattformar som .NET stöder.

Se även