sobota, 19 marca 2022

Wielowątkowość

Z własnego doświadczenia znam co najmniej kilka powodów, dla których jednoczesne przetwarzanie wielu fragmentów kodu jest atrakcyjnym rozwiązaniem. Jednym z nich jest chęć wykorzystania mocy obliczeniowej, jaką dysponują wielordzeniowe procesory. Innym powodem może być potrzeba zapewnienia skalowalności czy responsywności aplikacji poprzez wykonywanie zadań w tle. Miejmy jednak na uwadze, że wielowątkowość to nie tylko korzyści, ale i zagrożenia. Czynnikami spowalniającymi mogą okazać się przełączenia pomiędzy wątkami, o ile liczba wątków przekracza liczbę wątków sprzętowych. Kolejnym czynnikiem jest narzut czasu potrzebnego na przygotowanie wątku.

Znając korzyści oraz zagrożenia, przyjrzyjmy się klasom związanym z wielowątkowością. Zaczynamy od Thread, umożliwiającej tworzenie wątku oraz zarządzanie nim. Konstruktor klasy wymaga przekazania delegata, który zostanie wywołany w nowym wątku. O delegatach możesz przeczytać tutaj. Analizując kod, zwróć uwagę, że samo utworzenie wątku nie uruchamia go, dopiero wywołanie metody Start powoduje jego uruchomienie. Taka architektura umożliwia rozdzielenie momentu deklaracji wątku od jego faktycznego uruchomienia. Wywołanie metody Join powoduje zawieszenie wątku, z którego metoda została wywołana, do czasu zakończenia pracy wątku pobocznego. Należy zwrócić uwagę, że użycie metody Join może doprowadzić do zmniejszenia skalowalności ze względu na zawieszenie wątku wywołującego. Kod poniżej korzysta z właściwości CurrentManagedThreadId klasy Environment, identyfikującej bieżący wątek oraz metody Sleep symulującej "pracę" trwającą określoną liczbę milisekund. Tworząc wątek, możemy ustawić właściwość Priority w celu określenia priorytetu. O ile zlecona praca nie jest kluczowa, zaleca się ustawienie wartości BelowNormal lub Lowest, dzięki czemu pierwszeństwo otrzymają wyżej priorytetyzowane wątki.

Referencje do wątku wykonującego bieżący fragment kodu możemy uzyskać poprzez Thread.CurrentThread.

Console.WriteLine($""Hello, Main Thread: {Environment.CurrentManagedThreadId}!"");
var t = new Thread(DoWork);
Console.WriteLine(""Before start thread"");
t.Start();
Console.WriteLine(""Wait for thread"");
t.Join();
Console.WriteLine(""End main thread"");

void DoWork()
{
    Console.WriteLine($""\tHello, Thread: {Environment.CurrentManagedThreadId}!"");
    Thread.Sleep(5000);
    Console.WriteLine(""\tBye Bye"");
}

Samodzielne zarządzanie życiem wątków bywa skomplikowane i błędogenne. Na ratunek przychodzi klasa ThreadPool, której głównym zadaniem jest efektywne wykorzystanie zasobów procesora, poprzez dostosowanie liczby wątków do parametrów procesora, pozwalające uniknąć kosztownego przełączenia kontekstu pomiędzy rdzeniami. Zadania do wykonania zleca się poprzez statyczną metodę QueueUserWorkItem, gdzie podobnie jak w przypadku Thread, przekazujemy delegata, z tą różnicą, że delegat WaitCallback wymaga parametru typu object. Klasa ThreadPool udostępnia zestaw właściwości: ThreadCount, CompletedWorkItemCount oraz PendingWorkItemCount, umożliwiających monitorowanie stanu kolejki. Pierwsza z właściwości zwraca liczbę wszystkich wątków, następne określają liczbę przetworzonych oraz oczekujących zadań.

Console.WriteLine($""Hello, Main Thread: {Environment.CurrentManagedThreadId}!"");
Log();
ThreadPool.QueueUserWorkItem(DoWork);
ThreadPool.QueueUserWorkItem(DoWork);
ThreadPool.QueueUserWorkItem(DoWork);
ThreadPool.QueueUserWorkItem(DoWork);
ThreadPool.QueueUserWorkItem(DoWork);
ThreadPool.QueueUserWorkItem(DoWork);
Log();
Thread.Sleep(2000);
Log();
Console.WriteLine(""End main thread"");

void Log()
{
    Console.WriteLine($""Count: {ThreadPool.ThreadCount}, Completed: {ThreadPool.CompletedWorkItemCount}, Pending: {ThreadPool.PendingWorkItemCount}"");
}

void DoWork(object obj)
{
    Console.WriteLine($""\tHello, Thread: {Environment.CurrentManagedThreadId}!"");
    Thread.Sleep(1000);
    Console.WriteLine(""\tBye Bye"");
}

Kod powyżej zleca do wykonania sześciokrotnie metodę DoWork, która czeka sekundę. W międzyczasie główny wątek wykonuje dwukrotnie metodę Log, z dwusekundową przerwą. Zadaniem metody Log jest wyświetlenie szczegółów klasy ThreadPool. W przypadku mojego urządzenia, pomiędzy wywołaniami metod Log, metoda DoWork trzykrotnie wyświetliła "Hello...". Liczba wątków w puli została zwiększona do 10, a 2 zadania zostały zakolejkowane i oczekują na realizację. Kolejny log informuje o zakończeniu zleconych zadań.

Klasa ThreadPool reprezentuje globalną kolejkę dostępną dla wszystkich wątków oraz lokalną dla poszczególnych wątków. Domyślnie zadania umieszczane są w kolejce globalnej. Korzystając z przeciążenia preferLocal: true, mamy możliwość skierowania zadania do kolejki lokalnej. Takie zadanie zostanie umieszczone w lokalnej kolejce wątku, wykonującego obecne zadanie. Jeżeli wątek jest gotowy na rozpoczęcie kolejnego zadania, zaczyna od sprawdzenia lokalnej kolejki, a następnie sprawdza kolejkę globalną, a na samym końcu lokalne kolejki innych wątków (algorytm "work stealing").

Alternatywą dla QueueUserWorkItem jest wydajniejsza metoda UnsafeQueueUserWorkItem. Różnica wynika z braku przesyłania aktualnego kontekstu ExecutionContext do wątku, wykonującego delegowaną pracę. Dodatkowo, korzystając z interfejsu IThreadPoolWorkItem, możliwe jest pominięcie alokacji instancji delegata na stercie. Delegowana praca wykonywana jest jako osobna struktura implementująca metodę Execute.

var itIsMyWork = new DoWork();
ThreadPool.UnsafeQueueUserWorkItem(itIsMyWork, false);

struct DoWork : IThreadPoolWorkItem
{
    public void Execute()
    {
        Console.WriteLine($""\tHello, Thread: {Environment.CurrentManagedThreadId}!"");
        Thread.Sleep(1000);
        Console.WriteLine(""\tBye Bye"");
    }
}

Mając zarys działania wątków, zastanówmy się nad zasobami współdzielonymi przez wątki. Każdy z wątków dysponuje własnym stosem, na który trafiają zmienne lokalne. Sprawa komplikuje się w sytuacji, gdy dany zasób dostępny jest dla wielu wątków, przykładem są statyczne pola lub właściwości oraz wyrażenia lambda, pozwalające na użycie zmiennych zewnętrznych. Takie zmienne zostaną przekształcone przez CLR w pole wygenerowane przez kompilator, bez gwarancji lokalności zmiennej. Należy również mieć na uwadze rozróżnienie pomiędzy zmienną a obiektem. O ile zmienna lokalna dostępna jest wyłącznie wewnątrz metody, to zmienna typu referencyjnego może posiadać wiele odwołań. Problem został uchwycony w kodzie poniżej.

string ToString(IDictionary parameters)
{
    var parametersAsString = new StringBuilder();

    foreach (var par in parameters)
    {
        parametersAsString.AppendLine($""{par.Key}: {par.Value}"");
    }

    return parametersAsString.ToString();
}

Zmienna parametersAsString faktycznie jest zmienną lokalną (pomimo referencyjnego typu), mamy więc pewność, że inny wątek jej nie zmodyfikuje. Zastanówmy się jednak nad argumentem parameters. Analizując wyłącznie zaprezentowany kod, nie mamy wiedzy, czy zmienna wykorzystywana jest przez inne wątki. Możemy wyobrazić sobie sytuację, w której jeden wątek aktualizuje kolekcję, a drugi próbuje ją przetwarzać. Takie działanie spowoduje zgłoszenie wyjątku InvalidOperationException, prezentuje to kod poniżej.

using System.Text;

var parameters = new Dictionary();

ThreadPool.QueueUserWorkItem(Add, parameters);

Thread.Sleep(100);

ThreadPool.QueueUserWorkItem(ToString, parameters);

Thread.Sleep(5000);

void Add(object obj)
{
    var parameters = (Dictionary) obj;

    for (int i = 0; ; i++)
    { 
        parameters.Add($""Par{i}"", i); 
    } 
}

void ToString(object obj)
{
    var parameters = (Dictionary)obj;

    foreach (var par in parameters)
    {
        Console.WriteLine($""{par.Key}: {par.Value}"");
    }
}

W tym przypadku użycie kolekcji bezpiecznej wielowątkowo może nie rozwiązać problemu, ze względu na brak przyzwolenia na modyfikację kolekcji podczas przeglądania jej za pomocą pętli foreach. Możesz zastanawiać się, po co w ogóle używać kolekcji bezpiecznych wielowątkowo? Podstawowa różnica to gwarancja wykrycia oraz zgłoszenia problemu. Kolekcje, które nie zostały zaprojektowane do współdzielenia przez wiele wątków, niczego nie gwarantują, co może doprowadzić do awarii lub błędnego działania, np. poprzez zwrócenie wielokrotnie tego samego elementu.

Do omówienia pozostaje kwestia zgłaszania wyjątków przez wątki. W sytuacji, gdy wyjątek nie zostanie przechwycony, wywołane zostanie zdarzenie UnhandledException klasy AppDomain, o ile wyjątek nie dotyczy interfejsu użytkownika. Wyjątki UI obsługiwane są przez zdarzenie ThreadException. Dokładne objaśnienie zasad działania obu zdarzeń znajdziesz tutaj. Ogólnie zalecana jest obsługa wyjątków wewnątrz wątków za pomocą sekcji try/catch.

Niektóre z obiektów wymagają, aby ich użycie następowało wyłącznie w określonych wątkach. Przykładem takich obiektów mogą być kontrolki interfejsu użytkownika. Taka zależność nazywa się powinowactwem do wątku. Powinowactwo do wątku może nieco utrudnić tworzenie kodu wielowątkowego. Załóżmy, że przygotowaliśmy algorytm wielowątkowego przetwarzania danych, wraz z jego zakończeniem musimy zwrócić dane do interfejsu użytkownika. Jednak tę czynność możemy zrealizować wyłącznie w ściśle określonym wątku zwanym wątkiem rysownika (ang. UI thread). Chcąc zagwarantować płynne działanie aplikacji, operacje długotrwałe nie mogą być realizowane w wątku rysownika. Natomiast próba zaktualizowania interfejsu z innego wątku spowoduje zgłoszenie wyjątku. W takiej sytuacji konieczna jest komunikacja z wątkiem rysownika, tak aby to on był odpowiedzialny za zwrócenie danych. W tym celu udostępniona została klasa SynchronizationContext. Statyczna właściwość Current zwraca instancję klasy reprezentującej wątek, w którym jest wykonywane bieżący kod. Wartość null oznacza, że jest to domyślny kontekst ThreadPool. Zwrócony obiekt możemy przekazać, a następnie użyć metody Post w celu wykonania kodu w określonym wątku. Poniższy przykład prezentuje fragment kodu, w którym zdarzenie Click pobiera kontekst, przekazując go wraz ze zleconą pracą. W wyniku działania metody DoWork następuje aktualizacja właściwości Text kontrolki label, przy czym następuje to w kontekście wątku obsługi interfejsu użytkownika, a nie puli wątków. Chciałbym zaznaczyć, że metoda Post nie czeka na wykonanie operacji, dzięki czemu wątek roboczy nie jest blokowany. Użycie metody Send powoduje zablokowanie wątku roboczego, do momentu zakończenia przekazanego delegata. Stosowanie metody Send nie jest zalecanym rozwiązaniem, ze względu na możliwość doprowadzenia do zakleszczenia wątków.

private void button_Click(object sender, EventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    ThreadPool.QueueUserWorkItem(DoWork, uiContext);
}

private void DoWork(object obj)
{
    var result = new Random().Next(100);
    ((SynchronizationContext)obj).Post(p => {
        label.Text = result.ToString();
    }, null);
}

Na koniec poruszymy temat pamięci lokalnej wątku (ang. thread-local storage). Istnieją dwa sposoby korzystania z tego zakresu pamięci. Pierwszy z nich wymaga użycia atrybutu ThreadStaticAttribute oraz oznaczenia nim pola. Poniższa klasa ThreadCount deklaruje statyczne pole _Count oznaczone atrybutem ThreadStaticAttribute, przez co CLR utworzy instancję tego pola dla każdego wątku. Wykonanie metody Increment spowoduje zwiększenie wartości w kontekście wątku wykonującego metodę.

ThreadPool.QueueUserWorkItem((obj) => { 
    ThreadCount.Increment();
    Console.WriteLine($""Work A - {ThreadCount.Count}"");
});

ThreadPool.QueueUserWorkItem((obj) => {
    ThreadCount.Increment();
    ThreadCount.Increment();
    ThreadCount.Increment();
    Console.WriteLine($""Work B - {ThreadCount.Count}"");
});

Console.ReadLine();

public static class ThreadCount
{
    [ThreadStatic]
    private static int _Count = 10;

    public static int Count => _Count;

    public static void Increment()
    {
        _Count++;
    }
}

Podczas stosowania atrybutu ThreadStatic nie należy korzystać z inicjalizatora wartości, gdyż inicjalizacja statycznego pola zostanie wykonana dokładnie raz. Użycie statycznej inicjalizacji doprowadzi do sytuacji, w której pierwszy wątek otrzyma prawidłową wartość początkową, natomiast każdy kolejny wątek otrzyma wartość domyślną.

.NET 4.0 wprowadza klasę ThreadLocal<T>, będącą alternatywą dla atrybutu ThreadStaticAttribute. Referencję do instancji można zapisać zarówno w statycznym jak i niestatycznym polu. Wartość przechowywaną przez obiekt ThreadLocal<T> odczytujemy poprzez właściwość Value. Przykład poniżej nawiązuje do przykładu użytego przy okazji ThreadStaticAttribute, jednak tym razem klasa ThreadCount nie jest statyczna, a instancja utworzonego obiektu przekazana do różnych wątków udostępnia wartości związane z wątkiem. Konstruktor klasy ThreadLocal<T> umożliwia przekazanie delegata Func<T>, który zostanie wywołany za każdym razem, gdy nowy wątek użyje wartości. Dzięki temu rozwiązany został problem jednokrotnej inicjalizacji wartości. Inicjalizacja ma charakter leniwy. Dodatkowo, konstruktor umożliwia przekazanie parametru trackAllValues, który powoduje udostępnienie właściwości Values przechowującej kolekcję wszystkich wartości.

var tc = new ThreadCount();

ThreadPool.QueueUserWorkItem((obj) => {
    tc.Increment();
    Console.WriteLine($""Work A - {tc.Count}"");
});

ThreadPool.QueueUserWorkItem((obj) => {
    tc.Increment();
    tc.Increment();
    tc.Increment();
    Console.WriteLine($""Work B - {tc.Count}"");
});

Console.ReadLine();

public class ThreadCount
{
    private ThreadLocal _Count = new ThreadLocal(() => 10);

    public int Count => _Count.Value;

    public void Increment()
    {
        _Count.Value++;
    }
}

Korzystając z pamięci lokalnej wątków, musisz zwrócić uwagę, iż w czasie działania aplikacji możliwe jest utworzenie dużej liczby wątków. Szczególnie w sytuacji, gdy korzystamy z puli wątków lub asynchronicznych możliwości języka. Sam .NET Framework, w miejscach, w których teoretycznie mógłby skorzystać z pamięci lokalnej wątku, jak np. HttpContext.Current, nie robi tego, lecz korzysta z kontekstu wykonywania, reprezentowanego przez klasę ExecutionContext. Bieżący kontekst pobieramy przy użyciu metody Capture. Kontekst wywołania oprócz kontekstu synchronizacji SynchronizationContext, posiada informacje związane z bezpieczeństwem, takie jak to, czy bieżący stos wywołania zawiera kod obdarzony "częściowym zaufaniem". Kontekst wywołania wykorzystywany jest za każdym razem, gdy długotrwała operacja kończy się w innym wątku, niż została rozpoczęta. Jest to związane ze wzorcami asynchronicznego wykonywania kodu. Więcej informacji o asynchroniczności znajdziesz tutaj.

Zgrubnie omówiłem tematykę związaną z wątkami, stanowiącą wstęp do zadań. Zapraszam do dalszej lektury.

Troska Robert