Vlákna

Vícevláknové programování je docela oříšek. Hlavní problém je, jak program odladit (když si každé vlákno může běžet různou rychlostí) a jak zajistit synchronizaci přístupu ke sdíleným prostředkům. Vlákna byste proto měli používat po skrovnu. Na druhou stranu vám vlákna umožní taková kouzla, jako třeba to, že se program nezablokuje jen pro to, kdy čeká na vstup od uživatele. Pro programování her neocenitelné :-).

O vláknech

Vlákna jsou podobné procesům s tím rozdílem, že vlákna jakoby běží uvnitř procesu. Zabijete proces, umřou všechny vlákna. Navíc, všechny vlákna sdílejí stejnou paměť s procesem.

To znamená, že když vytvoříte nové vlákno, nemáte problém se sdílením dat, nemusíte používat sdílenou paměť (ale můžete, v tom vám nic nebrání). Problém máte naopak se synchronizací přístupu k datům. Není moc dobré, když se dvě vlákna perou o jednu proměnnou a mění si ji pod rukama.

Jediné co vlákna nesdílejí, je zásobník. Každé má svůj vlastní. Zásobník vlákna je ale řádově menší než zásobník procesu, takže jej můžete rychle vyčerpat, pokud budete volat mnoho vnořených funkcí a používat velké lokální proměnné, viz příklad Nastavování systémových limitů.

Vlákna se vytvářejí/ruší a synchronizují (pomocí mutexů, viz dále) rychleji, než procesy. Procesy se používají hlavně pro spouštení externích programů pomocí exec*, nebo pokud chcete nastavit procesu jiná uživatelská práva (to na vlákně nejde), nebo když chcete spustit něco, co s původním procesem má jen málo společného a nechete se bát o synchronziaci přístupu k proměnným.

Vytvoření vlákna

V C++ standardu je definována třída std::thread (v knihovně <thread>), která se používá pro vytvoření a práci s vkláknem, ale v C standardu nic takového definovaného není. Proto si budete muset vystačit s „nestandardními“ funkcemi. (Funkce sice nejsou ve standardu C, ale jsou standardizovány normou POSIX. Najdete je tak na většině Linuxů/Unixů, ale už ne třeba ve Windows. Ve Windows samozřejmě taky můžete pracovat s vlákny, ale budete se muset poohléndout po jiných funkcích …)

Implementaci vláken v Linuxech najdete v knihovně libpthread.so, proto musíte překládat programy s volbou -lpthread.

Vlákno se vytvoří pomocí funkce pthread_create() (deklrace v knihovně <pthread.h>).

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
        void *(*start_routine) (void *), void *arg);

Prvním argumentem je odkaz na proměnnou typu pthread_t, která slouží k přístupu k vláknu. Používají ji další funkce pro práci s vláknem. Druhý argument nastavuje nějaké vlastnosti vlákna (třeba velikost jeho zásobníku). Pokud se spokojíte s defaultními hodnotami (a v tomto článku se spokojíme), můžete předat NULL. Třetím argumentem je odkaz na funkci, která se ve vlánku spustí („main()“ funkce pro vlákno) a čtvrtý argument je argument, který se spuštěné funkci předá. Může také být NULL, když nemáte co předat.

Funkce, která se ve vláknu spouští, void *(*start_routine) (void *) musí mít návratovou hodnotu void *. Když nic vracet nechcete, vracejte NULL. A pozor! Nevracejte nikdy odkaz na lokální proměnnou. Lokální proměnné se po ukončení funkce/vlákna ruší …

Podobně, jako mohou vznikat zombíci u procesů, mohou zůstat v systému viset vlákna (a zabírat tak paměť), pokud nezavoláte funkci pthread_detach() nebo pthread_join();.

int pthread_detach(pthread_t thread);
int pthread_join(pthread_t thread, void **retval);

Funkce pthread_detach() jednoduše říká „návratová hodnota tohoto vlákna mě nezajímá, až vlákno skončí, můžeš ho smazat“.
Funkce pthread_join(); čeká až vlákno skončí a pak uloží odkaz na návratovou hodnotu vlákna do odkazu retval. Pokud vás tato hodnota nezajímá, můžete jako druhý arugment uvést NULL.

Následující příklad funguje podobně jako příklad z předchozí kapitoly pipe1.c.

Pro komunikaci tentokrát stačí jen jedna roura (nenahrazujeme standardní vstup/výstup). Proto se v producentovi i konzumentovi zavírá jen jeden konec po ukončení práce.

Taky si všiměte, jak kontroluji chybu u funkce pthrad_create(). Tato funkce nanastavuje errno, pro zjištění chyby jsem místo toho použil funkce errx()strerror().
Pokud vás tyto funkce zaujali, nastudujte si je z manuálových stránek.

/*------------------------------------------------*/
/* 24threads/threads1.c                           */

#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdio.h>
#include <err.h>
#include <string.h> /* kvuli strerror() */

void *producent(void * arg) {
    int i;
    int *pipe = (int *)arg;
    printf("Producent start\n");
    for (i = 0; i <= 5; i++) {
        write(pipe[1], &i, sizeof(i));
    }
    close(pipe[1]);
    printf("Producent konec\n");
    return NULL;
}

void *consumer(void * arg) {
    ssize_t readed;
    int i;
    int *pipe = (int *)arg;
    printf("Consumer start\n");
    do {
        readed = read(pipe[0],&i, sizeof(i));
        if(readed != sizeof(i)) break;
        printf("Načteno %i\n",i);
    } while(true);
    close(pipe[0]);
    printf("Consumer konec\n");
    return NULL;
}

int main(void) {
    pthread_t producent_t, consumer_t;
    int pipefd[2], err;
   
    if(pipe(pipefd) != 0) {
        perror("pipe");
        exit(EXIT_FAILURE);
    }

    err = pthread_create(&producent_t,NULL,producent,(void *)pipefd);
    if(err) {
        errx(1, "pthread_create producent %s\n",strerror(err));
        exit(EXIT_FAILURE);
    }
    err = pthread_create(&consumer_t,NULL,consumer,(void *)pipefd);
    if(err) {
        errx(1, "pthread_create consumer %s\n",strerror(err));
        exit(EXIT_FAILURE);
    }
    pthread_detach(producent_t);
    pthread_join(consumer_t, NULL);
    printf("main konec\n");
    return 0;
}

/*------------------------------------------------*/

V programu jsem vynechal některé kontroly návratových hodnot (třeba funkcí pthread_detach() a pthread_join()), aby byl příklad kratší. V reálné aplikaci návratové hodnoty vždy kontrolujte!

Výstup z programu může vypadat třeba takto:

Consumer start
Producent start
Producent konec
Načteno 0
Načteno 1
Načteno 2
Načteno 3
Načteno 4
Načteno 5
Consumer konec
main konec

Jak vidíte, výsledek se od použití procesů neliší. Jen jsem ušetřil práci s kopírováním paměti (za tu cenu, že vlákna mají méně místa pro zásobník). To, že konzument nastartoval rychleji než producent bylo jen dílem „náhody“.

Vlákna mohou, stejně jako procesy, běžet každé na jiném procesoru v počítači a tím program zrychlit. V Linuxu je to dokonce uděláno tak, že pro každé vlákno se vytváří vlastní proces (má i svůj vlastní PID (process id)). Ale pořád platí, že vlákna sdílejí tu samou paměť (což je většinou výhoda).

Mutexy

To, že vlákna sdílejí stejnou paměť, klade větší nároky na synchronizaci přístupu k paměti. Paměť můžete synchronizovat pomocí semaforů, ale pro vlákna máte i lepší volbu – mutexy.

Mutex = mutual exclusion, neboli vzájemné vyloučení.

Mutex, na rozdíl od semaforu, nemůže odemknout jiné vlákno než to, které ho zamknulo!

Dejte si pozor na to, abyste ve stejném vláknu nezkoušeli zamknout nebo odemknout stejný mutex vícekrát. Takové chování není normou definované, takže to může skončit všelijak.

Mutex můžete inicializovat pomocí makra PTHREAD_MUTEX_INITIALIZER.

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

Pokud ale vytváříte mutex dynamicky (pomocí volání malloc() atp.), musíte použít funkci pthread_mutex_init().

int pthread_mutex_init(pthread_mutex_t *mx, const pthread_mutexattr_t *attr);

Druhý parametr, attr můžete nechat NULL, pokud se spokojíte s defaultním nastavením mutexu.

Po skončení používání mutexu byste jej měli zničit funkcí pthread_mutex_destroy(), aby se uvolnili už nepotřebné zdroje.

int pthread_mutex_destroy(pthread_mutex_t *mutex);

Následující příklad je variací na příklad semafory2.c. Jen s tím rozdílem, že místo nového procesu vytvářím nová vlákna a místo semaforu používám mutex.

Protože nemám rád globální proměnné, předávám funkcím producent() a consument() odkaz na sdílenou paměť i semafor přes jejich argument. Protože je dáno, že funkce spouštěná vláknem musí mít právě jeden argument (typu void *), předávám odkaz na strukturu sdilenaPamet, která obsahuje všechno to, co chci předat.

Kopírování mutexu není normou definováno, což znamená, že kopie mutexu nemusí (a nejspíš nebudou) fungovat. Mutexy proto vždycky předávejte jen odkazem!

/*------------------------------------------------*/
/* 24threads/mutex.c                              */

#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>

#define N 1024*1024

typedef struct {
    pthread_mutex_t * mutex;
    int * m;
} sdilenaPamet;

void * producent(void * arg) {
    int i,j;
    sdilenaPamet *p = (sdilenaPamet *) arg;
    srand(0);
    printf("Producent start\n");
    for (i = 0; i <= 200; i++) {
        pthread_mutex_lock(p->mutex);
        for (j = 0; j < N; j++) {
            memcpy(p->m+j, &i, sizeof(int));
        }
        pthread_mutex_unlock(p->mutex);
        usleep(rand() %50);
    }
    printf("Producent konec\n");
    return NULL;
}

void * consumer(void * arg) {
    static int result = 0;
    sdilenaPamet *p = (sdilenaPamet *) arg;
    int i;
    for (i = 0; i <= 10; i++) {
        pthread_mutex_lock(p->mutex);
        printf("m = %i ... %i", *(p->m), *(p->m+(N-1)));
        result += *(p->m);
        pthread_mutex_unlock(p->mutex);
        usleep(100000);
        printf("\n");
    }
    return &result;
}

int main(void) {
    pthread_t producent_t, consumer_t;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    int m[N];
    sdilenaPamet pamet = { &mutex, m };
    int *result;

    pthread_create(&producent_t,NULL,producent,(void *)&pamet);
    pthread_create(&consumer_t,NULL,consumer,(void *)&pamet);

    pthread_join(producent_t, NULL);
    pthread_join(consumer_t, (void **)&result);
    pthread_mutex_destroy(&mutex);
    printf("main konec, result = %i\n", *result);

    return 0;
}

/*------------------------------------------------*/

V programu jsem vynechal některé kontroly návratových hodnot, aby byl příklad kratší. V reálné aplikaci návratové hodnoty vždy kontrolujte!

Možný výstup z programu:

Producent start
m = 0 ... 134515346
m = 5 ... 5
m = 12 ... 12
m = 18 ... 18
m = 24 ... 24
m = 31 ... 31
m = 37 ... 37
m = 43 ... 43
m = 49 ... 49
m = 55 ... 55
m = 61 ... 61
Producent konec
main konec, result = 335

Co je to za hrozné číslo 134515346? Inu, nijak jsem neinicalizoval pole m[N] a konzument si jeho první i poslední položku přečetl dřív, než do nich producent něco zapsal. Takže se vypsalo nějaké „smetí“. (V příkladu se sdílenou pamětí semafory2.c se tohle stát nemůže, protože nově vytvořená sdílená paměť je automaticky vynulována.)

Buď bych měl pole m[N] před spuštěním vláken vynulovat, nebo by bylo fajn, kdyby konzument nezačal číst dřív, než producent něco zapíše. A ještě lepší by bylo, kdyby producent nezapsal další řadu dat dříve, než si konzument přečte tu poslední zapsanou. A na toto se hodí tzv. podmínkové proměnné.

Podmínkové proměnné

Podmínková proměnná (conditional variable) nemá s podmínkou vlastně nic společného.

Podmínkovým proměnným by se mělo spíše říkat „zámky, které zamknou vlákno, dokud mu nějaké jiné vlákno nepošle signál, že už nastalo, na co zamčené vlákno čeká“.

Tomu důvodu se říká podmínka. Ovšem pozor, vlákno může dostat signál od nějakého záškodníka i když ona podmínka ještě nebyla splněna. Vlákna se mohou, dle normy, vzbudit čas od času sami od sebe. Proto je důležité vždycky po probuzení zkontrolovat, zda událost (podmínka), na kterou vlákno čeká, skutečně nastala.

Podmínková proměnná se vždy váže s nějaký mutexem, který musí mít vlákno uzamčený, aby mohl uzamknout podmínkovou proměnnou. Uzamknutím podmínkové proměnné se tento mutex odemkne. Když dostane vlákno signál, nejdříve mutex zase zamkne a až pak pokračuje ve svém vykonávání. Zní to možná složitě, ale funguje to jednoduše, asi takto:

...
/* Získám mutex */
pthread_mutex_lock(&mutex);
/* Dělám něco se sdílenou pamětí */
...
/* Teď začnu čekat na nějakou podmínku. Tím uvolním mutex.
  Někdo další tak může pracovat se sdílenou pamětí a
  poslat mi signál, když si myslí, že je podmínka splněna.
*/

pthread_cond_wait(&cond, &mutex);
/* Teď si musím ověřit, zda je podmínka skutečně splněna,
  nebo zda bylo vlákno probuzeno systémem bez důvodu!
  Mám ale zpět zámek, takže můžu bez obav pracovat se
  sdílenou pamětí.
*/

...
/* Po dokončení práce se sdílenou pamětí uvolním mutex */
pthread_mutex_unlock(&mutex);

Čekání na splnění podmínky se obvykle dělá v nekonečném cyklu, něco jako:

while(!podminkaSplnena())
        pthread_cond_wait(&cond, &mutex);

Vlákno, které má za úkol podmínku splnit a poslat signál, obvykle pracuje takto:

pthread_mutex_lock(&mutex);
...
/* dělám, co je potřeba pro splnění podmínky */
...
/* oznámím, že je podmínka splňena */
pthread_cond_signal(&cond);
/* oznámení dorazí až po odmčení zámku */
pthread_mutex_unlock(&mutex);

Všiměte si, že se pthread_cond_signal() volá mezi pthread_mutex_lock() a pthread_mutex_unlock(). To je strašně důležité. Nesmí se totiž stát, že první vlákno zavolá pthread_cond_wait() dříve, než druhé vlákno zavolá pthread_cond_signal(). V takovém případě by totiž byl signál ztracen!

Stejně, jako musíte odstranit mutex voláním pthread_mutex_destroy(), musíte odstranit podmínkovou proměnnou voláním pthread_cond_destroy().

V příkadu jsem tentokrát použil globální proměnné (ikdyž jsem je mohl jednoduše přidat do strktury sdilenaPamet) – to jen tak pro příklad :-).

Proměnná zapsano reprezentuje podmínku, na kterou čeká konzument i producent (každý ale na opačnou hodnotu). Všiměte si, jak její hodnotu měním před tím, než pošlu signál. Všiměte si taky, že když její hodnotu testuji, vždycky vlastním mutex, takže mám zaručeno, že mi ji nikdo nezmění pod rukama (protože se mění jen v kódu (protože ji měním jen mezi voláním pthread_mutex_lock() a pthread_mutex_unlock().

Globální proměnná cond je podmínková proměnná. Musí se inicializovat funkcí pthread_cond_init() (viz funkce main()).

Příklad se od toho předchozího liší právě jen přidáním podmínky a podmínkové proměnné.

  1. /*------------------------------------------------*/
  2. /* 24threads/podminky.c                           */
  3.  
  4. #include <pthread.h>
  5. #include <unistd.h>
  6. #include <stdlib.h>
  7. #include <stdbool.h>
  8. #include <stdio.h>
  9. #include <string.h>
  10.  
  11. #define N 1024*1024
  12.  
  13.     pthread_mutex_t * mutex;
  14.     int * m;
  15. } sdilenaPamet;
  16.  
  17. bool zapsano = false;
  18. pthread_cond_t cond;
  19.  
  20. void * producent(void * arg) {
  21.     int i,j;
  22.     sdilenaPamet *p = (sdilenaPamet *) arg;
  23.     srand(0);
  24.     printf("Producent start\n");
  25.     for (i = 0; i <= 10; i++) {
  26.         pthread_mutex_lock(p->mutex);
  27.         while(zapsano) {
  28.             pthread_cond_wait(&cond, p->mutex);
  29.         }
  30.         for (j = 0; j < N; j++) {
  31.             memcpy(p->m+j, &i, sizeof(int));
  32.         }
  33.         zapsano = true;
  34.         pthread_cond_signal(&cond);
  35.         pthread_mutex_unlock(p->mutex);
  36.         usleep(rand() %50);
  37.     }
  38.     printf("Producent konec\n");
  39.     return NULL;
  40. }
  41.  
  42. void * consumer(void * arg) {
  43.     static int result = 0;
  44.     sdilenaPamet *p = (sdilenaPamet *) arg;
  45.     int i;
  46.     for (i = 0; i <= 10; i++) {
  47.         pthread_mutex_lock(p->mutex);
  48.         while(!zapsano) {
  49.             pthread_cond_wait(&cond, p->mutex);
  50.         }
  51.         printf("m = %i ... %i", *(p->m), *(p->m+(N-1)));
  52.         result += *(p->m);
  53.         zapsano = false;
  54.         pthread_cond_signal(&cond);
  55.         pthread_mutex_unlock(p->mutex);
  56.         usleep(100000);
  57.         printf("\n");
  58.     }
  59.     return &result;
  60. }
  61.  
  62. int main(void) {
  63.     pthread_t producent_t, consumer_t;
  64.     pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  65.     int m[N];
  66.     sdilenaPamet pamet = { &mutex, m };
  67.     int *result;
  68.  
  69.     pthread_cond_init(&cond,NULL);
  70.  
  71.     pthread_create(&producent_t,NULL,producent,(void *)&pamet);
  72.     pthread_create(&consumer_t,NULL,consumer,(void *)&pamet);
  73.  
  74.     pthread_join(producent_t, NULL);
  75.     pthread_join(consumer_t, (void **)&result);
  76.     pthread_cond_destroy(&cond);
  77.     pthread_mutex_destroy(&mutex);
  78.     printf("main konec, result = %i\n", *result);
  79.  
  80.     return 0;
  81. }
  82.  
  83. /*------------------------------------------------*/

Ve zdrojáku si taky všiměte, že producent produkuje právě 11 čísel a konzument konzumuje taky 11 čísel. Kdyby jich producent konzumoval víc, zaseknul by se (protože by čekal na to, až konzument čísla přečte) a naopak …

Výstup z programu:

./podminky 
Producent start
m = 0 ... 0
m = 1 ... 1
m = 2 ... 2
m = 3 ... 3
m = 4 ... 4
m = 5 ... 5
m = 6 ... 6
m = 7 ... 7
m = 8 ... 8
m = 9 ... 9Producent konec

m = 10 ... 10
main konec, result = 55

Podmínkových proměnných můžete používat více než jen jendu. Můžete mít X vláken, které se navzájem zamikají jedným mutexem (tj. jen jedno vlákno z nich může běžet), ale mohou čekat na různé podmínky. Pokud několik vláken čeká na stejnou podmínku, pak po zavolání pthread_cond_signal() a pthread_mutex_unlock() se probudí náhodně jen jedno z nich. Pokud se pošle signál různým podmínkám (pthread_cond_signal() se dá zavolat v jednom vlákně pro různé podmínkové proměnné), stále platí, že se probudí jen jedno vlákno. A až toto vlákno mutex uvolní, probudí se to další …

Pozor

Pozor na předávání argumentů

Dávejte si pozor na tuto konstrukci při spouštění více vláken:

for(i = 0; i <= 10; i++) {
    pthread_create(&thread, NULL, funkce, &i);
}

Může se totiž snadno stát (a nejspíš stane), že než vlákno nastartuje a přečte si hodnotu i, cyklus for mezi tím stihne spustit několik dalších vláken a změnit tak hodnotu i, Takže si pak první vlákno v i může přečíst cokoliv mezi 0 a 10 …

Vyřešit to můžete buď tak, že předáte každému vláknu jinou proměnnou, nebo vytvoříte pole do kterého zapíšete příslušné hodnoty a každému vláku předáte odkaz na příslušnou adresu pole, nebo použijete, co já vím, třeba mutexy (zastavíte cyklus spouštící vlákna mutexem, který pak uvolní nastartované vlákno po přečtení (a zkopírování) hodnoty i).

Pozor na MT-Unsafe funkce

K MT-Unsafe funkcím často existuje MT-Safe varianta, například k rand() rand_r().

Tak zrovna funkce rand() je již MT-Safe a funkce rand_r() je proto deprecated.

Zkuste si zadat v konzoli tento příkaz:

apropos _r | egrep "_r "
Komentář Hlášení chyby
Created: 27.8.2014
Last updated: 22.5.2021
Tato stánka používá ke svému běhu cookies, díky kterým je možné monitorovat, co tu provádíte (ne že bych to bez cookies nezvládl). Také vás tu bude špehovat google analytics. Jestli si myslíte, že je to problém, vypněte si cookies ve vašem prohlížeči, nebo odejděte a už se nevracejte :-). Prohlížením tohoto webu souhlasíte s používáním cookies. Dozvědět se více..