Współbieżność w C++

Współbieżność w C++

Współbieżność — czyli jednoczesne wykonywanie kilku zadań — jest powszechnie wykorzystywana do zwiększania przepustowości (poprzez użycie kilku procesorów do wykonania jednego zadania) oraz interaktywności (poprzez umożliwienie działania jednej części programu, podczas gdy inna oczekuje na odpowiedź). Wszystkie nowoczesne języki programowania
obsługuję te techniki. Narzędzia dostępne w bibliotece standardowej C++ to przenośne i bezpieczne typowo wersje narzędzi, które są w tym języku już od ponad 20 lat i są obsługiwane prawie przez wszystkie rodzaje nowoczesnego sprzętu. Składniki dostarczane przez bibliotekę standardową mają przede wszystkim za zadanie wspomagać współbieżność na poziomie systemowym, a nie dostarczać wyszukanych wysokopoziomowych modeli współbieżności. Te można dostarczyć w postaci bibliotek utworzonych przy użyciu standardowych narzędzi. Biblioteka standardowa bezpośrednio umożliwia jednoczesne wykonywanie wielu wątków w jednej przestrzeni adresowej. W tym celu w języku C++ zdefiniowano odpowiedni model
pamięci i zestaw operacji atomowych. Jednak większość użytkowników narzędzi współbieżności będzie używać poprzez bibliotekę standardową i inne biblioteki zbudowane na bazie standardowej. W tym podrozdziale znajdują się zwięzłe przykłady wykorzystania narzędzi z biblioteki standardowej umożliwiających programowanie współbieżne: thread, mutex, lock(), packaged_task oraz future. Wszystkie te składniki są zbudowane na bazie narzędzi systemów operacyjnych i nie ustępują im pod względem wydajności.

Zadania i wątki

Obliczenia, które można wykonać równocześnie z innymi obliczeniami, nazywają się zadaniami. Wątek (ang. thread) to reprezentacja na poziomie systemu zadania w programie. Zadanie, które ma zostać wykonane równocześnie z innymi zadaniami, uruchamia się poprzez utworzenie obiektu std::thread (z nagłówka ), przekazując to zadanie jako argument. Zadanie jest
funkcją albo obiektem funkcyjnym:

void f(); // funkcja
struct F { // obiekt funkcyjny
    void operator()(); // operator wywoływania obiektu F (3.4.3)
};
void user()
{
    thread t1 {f}; // f() działa w osobnym wątku
    thread t2 {F()}; // F()() działa w osobnym wątku
    t1.join(); // czeka na t1
    t2.join(); // czeka na t2
}

Funkcja join() sprawia, że wyjście z funkcji user() nie nastąpi, dopóki wątki nie zakończą działania. Słowo join oznacza dołączyć, czyli poczekać, aż wątek zakończy działanie. Wątki w programie używają wspólnej przestrzeni adresowej. Pod tym względem różnią się od procesów, które ogólnie rzecz biorąc, nie dzielą bezpośrednio danych. Jako że wątki korzystają ze wspólnej przestrzeni adresowej, mogą się komunikować poprzez wspólne obiekty. Komunikację taką zwykle kontroluje się przy użyciu blokad i innych mechanizmów zapobiegających wyścigom do danych (niekontrolowanemu dostępowi do zmiennej przez wiele
wątków naraz). Programowanie współbieżnego wykonywania zadań bywa bardzo skomplikowane. Przeanalizujemy możliwą implementację zadań f (funkcja) i F (obiekt funkcyjny):

void f() { cout << "Witaj, "; }
struct F {
    void operator()() { cout << "świecie równoległy! "; }
};

Jest to przykład bardzo paskudnego błędu: zarówno f, jak i F() korzystają z obiektu cout bez jakiejkolwiek synchronizacji. Wynik tego jest nie do przewidzenia i za każdym razem może być inny, bo kolejność wykonywania współbieżnych zadań nie została zdefiniowana. W efekcie na wyjściu może pojawić się jakiś dziwny napis w rodzaju:

świeWitacie, równolejgły!

Zadania w programie równoległym powinny być od siebie całkowicie oddzielone z wyjątkiem sytuacji, w których komunikują się ze sobą w proste i oczywiste sposoby. Zadanie współbieżne najłatwiej jest sobie wyobrazić jako funkcję, która działa równocześnie z programem, który ją wywołał. Aby to działało, konieczne jest przekazywanie argumentów, odbieranie wyników oraz pilnowanie, aby między tymi czynnościami nie było dostępu do wspólnych danych (wyścig do danych).

Przekazywanie argumentów

Typowe zadanie działa na jakichś danych, które (a także wskaźniki i referencje) można łatwo przekazać jako argumenty:

void f(vector& v); // funkcja robiąca coś z v
struct F { // obiekt funkcyjny robiący coś z v
    vector& v;
    F(vector& vv) :v{vv} { }
    void operator()(); // operator wywołania
};
int main()
{
    vector some_vec {1,2,3,4,5,6,7,8,9};
    vector vec2 {10,11,12,13,14};
    thread t1 {f,some_vec}; // f(some_vec) działa w osobnym wątku
    thread t2 {F{vec2}}; // F(vec2)() działa w osobnym wątku
    t1.join();
    t2.join();
}

F{vec} zapisuje referencję do wektora przekazanego jako argument w F. Teraz F może używać tej tablicy i trzeba mieć nadzieję, że żadne inne zadanie nie użyje vec2 podczas działania F. Ryzyko można by było wyeliminować, przekazując vec2 przez wartość.
W inicjacji {f,some_vec} użyty jest konstruktor zmiennego szablonu thread, który może przyjmować dowolną sekwencję argumentów. Kompilator sprawdza, czy można wywołać pierwszy argument, biorąc pod uwagę następne argumenty, i buduje odpowiedni obiekt funkcyjny do przekazania do wątku. Zatem jeżeli F::operator()() i f() wykonują ten sam algorytm, obsługa tych dwóch zadań jest z grubsza jednakowa: w obu przypadkach tworzony jest obiekt funkcyjny do wykonania przez wątek.

Zwracanie wyników

W przykładzie przedstawionym w sekcji argumenty przekazałem przez referencję nie const. Robię tak tylko wtedy, gdy spodziewam się, że zadanie zmodyfikuje wartość danych, do których się odwołuje. Jest to trochę przebiegły, ale dość popularny sposób zwracania wyników. Bardziej klarowna technika polega na przekazaniu danych wejściowych przez referencję const
i określeniu miejsca, w którym ma zostać zapisany wynik w osobnym argumencie:

void f(const vector& v, double*res); // pobiera dane z v; wynik zapisuje w *res
class F {
public:
    F(const vector& vv, double*p) :v{vv}, res{p} { }
    void operator()(); // zapisuje wynik w *res
private:
    const vector& v; // źródło danych wejściowych
    double*res; // miejsce zapisu wyniku
};
int main()
{
    vector some_vec;
    vector vec2;
    //...
    double res1;
    double res2;
    thread t1 {f,some_vec,&res1}; // f(some_vec,&res1) działa w osobnym wątku
    thread t2 {F{vec2,&res2}}; // F{vec2,&res2}() działa w osobnym wątku
    t1.join();
    t2.join();
    cout << res1 << ' ' << res2 << ' ';
}

Wspólne używanie danych

Czasami zadania muszą korzystać z tego samego zbioru danych. W takim przypadku dostęp musi być synchronizowany, aby w określonym czasie zasoby były używane tylko przez jedno z zadań. Doświadczony programista zauważy, że jest to uproszczenie (np. nie ma problemu z jednoczesnym odczytem niezmiennych danych przez wiele zadań), ale zastanówmy się, jak sprawić, aby dostęp do wybranego zestawu obiektów w danym czasie miało maksymalnie jedno zadanie. Kluczowe znaczenie dla rozwiązania tego problemu ma mutex (obiekt wzajemnego wykluczania). Wątek zajmuje muteks przy użyciu operacji lock():

mutex m; // muteks do kontroli
int sh; // wspólne dane
void f()
{
    unique_lock lck {m}; // zajęcie muteksu
    sh += 7; // działania niewspólnych danych
} // niejawne zwolnienie muteksu

Konstruktor unique_lock zajmuje muteks (poprzez wywołanie m.lock()). Jeżeli muteks jest już zajęty przez inny wątek, to wątek starający się go zająć czeka (zostaje zablokowany), aż ten drugi skończy działanie. Gdy pierwszy wątek skończy pracę ze wspólnymi danymi, unique_lock zwalnia muteks (przy użyciu wywołania m.unlock()). Narzędzia do wzajemnego wykluczania
i blokowania znajdują się w nagłówku .

Relacja między wspólnymi danymi a muteksem jest konwencjonalna, tzn. programista musi po prostu wiedzieć, który muteks odpowiada określonej porcji danych. Oczywiście łatwo tu o błąd i dlatego przy użyciu różnych narzędzi języka należy starać się sprawić, aby wszystko było jasne. Na przykład:

class Record {
public:
    mutex rm;
    //...
};

Nie trzeba być geniuszem, żeby się domyślić, że dla rekordu o nazwie rec to rec.rm jest muteksem, który należy zająć przed próbą uzyskania dostępu do pozostałych danych tego obiektu. Chociaż mały komentarz albo lepsza nazwa też by nie zaszkodziły. Często konieczne jest jednoczesne uzyskanie dostępu do kilku zasobów, aby wykonać jakąś czynność. W takim przypadku może powstać zakleszczenie. Przykładowo: jeśli wątek thread1 zajmuje muteks mutex1, a potem próbuje zająć mutex2, podczas gdy wątek thread2 zajmuje muteks mutex2, a następnie próbuje zająć mutex1, to żadne z zadań nie zostanie wykonane.
W bibliotece standardowej znajduje się narzędzie pomocne w takiej sytuacji, umożliwiające zajęcie kilku blokad naraz:

void f()
{
    //...
    unique_lock lck1 {m1,defer_lock}; // defer_lock: nie próbuj jeszcze zająć muteksu
    unique_lock lck2 {m2,defer_lock};
    unique_lock lck3 {m3,defer_lock};
    //...
    lock(lck1,lck2,lck3); // zajmij wszystkie trzy blokady
    //... operacje na wspólnych danych...
} // niejawne zwolnienie wszystkich muteksów

Funkcja lock() rozpocznie wykonywanie swoich działań dopiero wtedy, gdy zajmie wszystkie trzy muteksy określone w argumentach i nigdy nie zablokuje się (nie zostanie uśpiona), mając zajęty muteks. Destruktory obiektów unique_lock zwalniają muteksy, gdy wątek wyjdzie poza zakres.

Komunikacja poprzez wspólne dane to programowanie na bardzo niskim poziomie abstrakcji. W szczególności programista musi wypracować metody dowiadywania się, jakie działania zostały wykonane przez różne zadania, a które jeszcze czekają na wykonanie. Pod tym względem technika posługiwania się wspólnymi danymi jest gorsza od wywołań i zwrotów wartości. Z drugiej strony, niektórzy są przekonani, że współdzielenie danych musi być bardziej wydajne niż kopiowanie argumentów i zwracanie wartości. Rzeczywiście tak może być w przypadkach, gdy w grę wchodzą duże ilości danych, ale blokowanie i odblokowywanie to
względnie kosztowne operacje. Poza tym nowoczesne komputery bardzo dobrze radzą sobie z kopiowaniem danych, zwłaszcza takich kompaktowych, jak elementy wektorów. Dlatego nie należy używać wspólnych danych do komunikacji ze względu na wydajność, jeśli się tego dokładnie nie przemyśli, a zwłaszcza jeśli nie wykona się odpowiednich pomiarów.

Czekanie na zdarzenia

Czasami wątek musi poczekać na jakieś zewnętrzne zdarzenie, np. aż inny wątek zakończy działanie albo minie określona ilość czasu. Najprostsze zdarzenie to upływ czasu. Spójrz na poniższy przykład:

using namespace std::chrono;
auto t0 = high_resolution_clock::now();
this_thread::sleep_for(milliseconds{20});
auto t1 = high_resolution_clock::now();
cout << "Minęło " << duration_cast(t1−t0).count() << " nanosekund ";

Zwróć uwagę, że nie trzeba nawet było uruchamiać wątku. Domyślnie this_thread odnosi się do jedynego wątku. Użyłem funkcji duration_cast w celu zamiany jednostek zegara na nanosekundy.

Narzędzia dotyczące czasu znajdują się w nagłówku . Podstawowe narzędzia do obsługi komunikacji przy użyciu zewnętrznych zdarzeń to zmienne condition_variable z nagłówka . condition_variable to mechanizm umożliwiający jednemu wątkowi poczekanie na inny. Przede wszystkim przy jego użyciu można sprawić, że wątek będzie czekał na spełnienie jakiegoś warunku (często zwanego zdarzeniem) jako efektu działania innego wątku. Zastanówmy się nad klasycznym przykładem komunikacji dwóch wątków poprzez przekazywanie wiadomości przy użyciu kolejki (queue). Dla uproszczenia deklaruję tę kolejkę i mechanizm unikania wyścigów do niej jako globalne dla producenta i konsumenta:

class Message { // obiekt do komunikowania
//...
};
queue mqueue; // kolejka wiadomości
condition_variable mcond; // zmienna komunikująca zdarzenia
mutex mmutex; // mechanizm blokowania

Typy queue, condition_variable oraz mutex są dostarczone przez bibliotekę standardową. Funkcja consumer() wczytuje i przetwarza wiadomości (Message):

void consumer()
{
    while(true) {
        unique_lock lck{mmutex}; // zajęcie muteksu
        while (mcond.wait(lck)) /* nic nie robi */; // zwalnia lck i czeka 
                                                                      // ponownie zajmuje lck po przebudzeniu
        auto m = mqueue.front(); // odbiera wiadomość
        mqueue.pop();
        lck.unlock(); // zwalnia lck
         //... przetwarza m...
    }
}

W kodzie tym jawnie chronię operacje na kolejce i condition_variable przy użyciu blokady unique_lock na muteksie. Oczekiwanie na obiekt condition_variable zwalnia jego blokadę określoną w argumencie do czasu zakończenia oczekiwania (aby kolejka nie była pusta), a potem zajmuje ją z powrotem.

Odpowiedni producent wygląda tak:

void producer()
{
    while(true) {
        Message m;
        //... wypełnienie wiadomości...
        unique_lock lck {mmutex}; // ochrona operacji
        mqueue.push(m);
        mcond.notify_one(); // powiadomienie
    } // zwolnienie blokady (na końcu zakresu)
}

Przy użyciu obiektów conditon_variable można stosować wiele eleganckich i wydajnych form współdzielenia, ale może to być trudne.

 Język C++. Kompendium wiedzy. Wydanie IV, Autor: Bjarne Stroustrup, Wydawnictwo: Helion 

Podobne artykuły

Podziel się ze znajomymi tym artykułem - udostępnij na FB lub wyślij e-maila korzystając z poniższych opcji:

wszystkie oferty