Komunikace mezi procesy

V této kapitole ukáži, jak si mohou procesy navzájem posílat data skrze tzv. roury (pipe). Ukáži na příkladu, jak můžete využít jiný existující program – spustíte jej, pošlete mu nějaký vstup a přečtete jeho výstup.
V druhé části se budu zabývat semafory, které slouží k synchronizaci práce se sdílenými prostředky. Konkrétněji, se sdílenou pamětí.

Roury (pipe)

Jak se v Linuxu používají roury byste už měli znát. V shellu se roura vytváří pomocí znaku |. Přesměrovává standardní výstup jednoho programu do standardního vstupu jiného. Například mohu přesměrovat výstup příkazu echo do vstupu příkazu grep:

$ echo -e "ipsum\nLorem\nipsum\ndolor\n sit\namet." | grep "o.*r"
Lorem
dolor

Funkce pipe() vytvoří dva souborové deskriptory roury, jeden pro čtení, druhý pro zápis. S těmito deskriptory můžete skutečně pracovat, jako by šlo o deskriptory k souborům, můžete používat funkce read() a write() a uzavírat je funkcí close().

int pipe(int pipefd[2]);

pipefd[0] slouží pro čtení roury, pipefd[1] pro zápis. toto pořadí je zřejmě kvůli tomu, že se obvykle říká read-write a ne write-read, ačkoliv budete nejdřív do roury zapisovat (do pipefd[1]) a pak teprve z roury číst.

Podle manuálové stránky man 7 pipe je v linuxu velikost bufferu roury 65536 bajtů. Pokud se do roury pokusíte zapsat více, zápis se zablokuje, dokud někdo nějaká data z roury nepřečte a tím neuvolní místo.

Pokud se naopak pokusíte číst z roury, do které nikdo nic nezapsal, bude čtení zablokováno, dokud někdo něco nezapíše, nebo rouru neuzavře – přesněji řečeno, dokud se neuzavře deskriptor pro zápis. Ještě přesněji řečeno, dokud se neuzavřou všechny kopie deskriptoru pro zápis.

Roura se obvykle používá tak, že se vytvoří před forknutím procesu. Nový proces má potom kopii deskriptorů roury a tak mohou oba procesy číst a zapisovat do stejné roury. Obvykle se však provádí skrze rouru jen jednosměrná komunikace, takže jeden proces hned zavře rouru pro čtení a jen zapisuje, druhý naopak.

A to je dobře, protože pro uzavření deskritoru pro čtení/zápis roury musí být uzavřeny všechny kopie deskriptorů pro čtení/zápis. Pokud je v nějakém procesu zapomenete uzavřít, druhý proces se může zablokovat kvůli čtení či zápisu do nekonečna.

Tady je typický příklad producent – konzument. Producent vytváří nějaká data (v příkladě typu integer, ale mohla by to být jakákoliv struktura) a konzument data čte.

  1. /*------------------------------------------------*/
  2. /* 23processKomunikace/pipe1.c                    */
  3.  
  4. #include <unistd.h>
  5. #include <stdlib.h>
  6. #include <stdbool.h>
  7. #include <stdio.h>
  8.  
  9. void producent(int pipe[2]) {
  10.     int i;
  11.     close(pipe[0]); /* nebudu cist */
  12.     for (i = 0; i <= 5; i++) {
  13.         write(pipe[1], &i, sizeof(i));
  14.     }
  15.     close(pipe[1]);
  16. }
  17.  
  18. void consumer(int pipe[2]) {
  19.     ssize_t readed;
  20.     int i;
  21.     close(pipe[1]); /* nebudu zapisovat */
  22.     do {
  23.         readed = read(pipe[0],&i, sizeof(i));
  24.         if(readed != sizeof(i)) break;
  25.         printf("Načteno %i\n",i);
  26.     } while(true);
  27.     close(pipe[0]);
  28. }
  29.  
  30. int main(void) {
  31.     pid_t pid;
  32.     int pipefd[2];
  33.    
  34.     if(pipe(pipefd) != 0) {
  35.         perror("pipe");
  36.         exit(EXIT_FAILURE);
  37.     }
  38.     pid = fork();
  39.     if(pid == 0) {
  40.         producent(pipefd);
  41.     } else {
  42.         consumer(pipefd);
  43.     }
  44.     return 0;
  45. }
  46.  
  47. /*------------------------------------------------*/

V tomto i v dalších příkladech jsem vynechal některé, jinak nutné, kontroly chyb, aby nebyl příklad příliš dlouhý. Například nekontroluji úspěch funkcí fork() nebo close().

Výstup z programu:

Načteno 0
Načteno 1
Načteno 2
Načteno 3
Načteno 4
Načteno 5

Využití externího programu

V tomto příkladu využiji program grep. První argument mého programu předám programu grep jako řetězec, který chci hledat (filter), ostatní argumenty pošlu do standardního vstupu grepu jako samostatné řádky. Jeho výstup pak načtu a zobrazím.

Použiji k tomu dvě roury. Jednu jsem pojmenoval pipefdin, tu použiji pro vstup pro grep. Druhou jsem pojmenoval pipefdout a z té zase budu číst výstup programu grep.

Celý trik funguje takto. Standardní vstup má file descriptor (fd) roven 0 a standardní výstup má fd 1. Můžete i použít konstanty STDIN_FILENOSTDOUT_FILENO. Pomocí funkce dup2() přiřadím deskriptorům z roury tyto čísla. Program grep bude pořád pracovat s fd 0 a 1, ale ty už budou odkazovat na moje roury.

 int dup2(int oldfd, int newfd);

Funkce dup2() vytvoří nový deskriptor newfd jako kopii deskriptoru oldfd. Pokud číslo v newfd je existující otevřený deskriptor, tak jej zavře.

Volání dup2(pipefdin[0], STDIN_FILENO); zavře standardní vstup a nahradí jej kopií pipefdin[0]. Proto je vhodné poté pipefdin[0] uzavřít.

Rodič se musí v příkladu forknout 2x. Jednou kvůli spušětní porgramu grep, podruhé kvůli tomu, aby mohl v jednom procesu data zapisovat a v druhém je číst. Kdybyste chtěli data nejdřív zapsat a až pak přečíst, mohlo by se stát, že při zápisu zaplníte rouru, protože z ní nikdo nečte, čímž se navěky zablokujete.

Můj program čte data z příkazové řádky, která, pokud se nepletu, nemůže být dlouhá 65535 bajtů, takže můžete říct, že jsou mé obavy liché. Mohl bych se na to spolehnout a program tak zkrátit a urychlit. To by bylo ale lajdáctví. Co když jednou budou operační systémy, které toto omezení mít nebudou? Můj program přestane pracovat. Co když se rozhodnu přepsat program tak, aby četl data odjinud, než z příkazové řádky? Zase to přestane fungovat …
  1. /*------------------------------------------------*/
  2. /* 23processKomunikace/pipe2.c                    */
  3.  
  4. #include <unistd.h>
  5. #include <stdlib.h>
  6. #include <stdbool.h>
  7. #include <stdio.h>
  8. #include <string.h>
  9.  
  10. #define N 512
  11.  
  12. int main(int argc, char *argv[]) {
  13.     pid_t pid;
  14.     int pipefdin[2];
  15.     int pipefdout[2];
  16.     char * filter;
  17.     char buff[N];
  18.     int i;
  19.  
  20.     if(argc < 3) {
  21.         fprintf(stderr,"Usage: %s filter word [word, ...]\n", argv[0]);
  22.         return 0;
  23.     }
  24.     filter = argv[1];
  25.    
  26.     if(pipe(pipefdin) != 0 || pipe(pipefdout) != 0) {
  27.         perror("pipe");
  28.         exit(EXIT_FAILURE);
  29.     }
  30.  
  31.     pid = fork();
  32.     if (pid < 0) {
  33.         perror("fork");
  34.         exit(EXIT_FAILURE);
  35.     }
  36.     if(pid == 0) {
  37.         dup2(pipefdin[0], STDIN_FILENO);
  38.         close(pipefdin[0]);
  39.         close(pipefdin[1]);
  40.         dup2(pipefdout[1], STDOUT_FILENO);
  41.         close(pipefdout[0]);
  42.         close(pipefdout[1]);
  43.         execlp("grep","grep",filter,NULL);
  44.         /* execlp ukonci process */
  45.         /* Kdyz ne, je to chyba */
  46.         perror("execlp");
  47.         exit(EXIT_FAILURE);
  48.     }
  49.     /** rodic **/
  50.     close(pipefdin[0]);
  51.     close(pipefdout[1]);
  52.    
  53.     pid = fork();
  54.     if (pid < 0) {
  55.         perror("fork");
  56.         exit(EXIT_FAILURE);
  57.     }
  58.     if (pid == 0) {
  59.         close(pipefdout[0]);
  60.         for (i = 2; i < argc; i++) {
  61.             write(pipefdin[1], argv[i], strlen(argv[i]));
  62.             write(pipefdin[1], "\n", strlen("\n"));
  63.         }
  64.         close(pipefdin[1]);
  65.     }
  66.     else {
  67.         close(pipefdin[1]);
  68.         do {
  69.             i = read(pipefdout[0],buff, N-1);
  70.             if(!i) break;
  71.             buff[i] = '\0';
  72.             printf("Prošlo filtrem:\n%s", buff);
  73.         } while (i);
  74.         close(pipefdout[0]);
  75.     }
  76.  
  77.     return 0;
  78. }
  79.  
  80. /*------------------------------------------------*/

Ukázka použití programu:

$ ./pipe2 "o.*r" ipsum Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Prošlo filtrem:
Lorem
dolor
consectetur

No není to nádhera? Můj program dostal, jakoby zázrakem, schopnost filtrovat argumenty podle regulárních výrazů!

Pojmenované roury

Pojmenované roury (named pipe) fungují stejně jako „anonymní“ roury. Rozdíl je v tom, že jsou reprezentovány speciálním souborem na souborovém systému. Otevíráte je funkcí open(), jako jakýkoliv jiný soubor, buď pro čtení, nebo pro zápis.

Tento speciální soubor můžete vytvořit linuxovým příkazem mkfifo, nebo funkcí mkfifo().

int mkfifo(const char *pathname, mode_t mode);

Výhodou pojmenované roury je, že ji mohou použít nezávislé procesy (programy), nejen procesy co se rozforkovali.

Hezký a jednoduchý příklad najdete na Stack Overflow. Další příklad uvidíte později v kapitole o socketech.

Semafory

V následujícím příkladu vytvořím sdílenou paměť, do které bude producent v cyklu zapisovat dlouhou řadu stejného čísla a konzument načte a zobrazí vždy první a poslední číslo z řady.

Jako klíč sdílené paměti tentokrát použiji IPC_PRIVATE, což znamená, že nechám výběr klíče na operačním systému, aby vybral nějaký nevyužitý.

Sdílená paměť je dost velká (1024x124 bajtů), aby se ukázalo, co chci ukázat.

/*------------------------------------------------*/
/* 23processKomunikace/semafory1.c                */

#include <sys/types.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <stdio.h>

#define N 1024*1024

void producent(int segment_id) {
    int *m;
    int i,j;
    srand(0);
    m = (int *) shmat(segment_id, NULL, 0);
    printf("Producent start\n");
    for (i = 0; i <= 200; i++) {
        for (j = 0; j < N; j++) {
            memcpy(m+j, &i, sizeof(int));
        }
        usleep(rand() %50);
    }
    printf("Producent konec\n");
    shmdt(m);
}

void consumer(int segment_id) {
    int *m;
    int i;
    m = (int *) shmat(segment_id, NULL, 0);
    for (i = 0; i <= 10; i++) {
        printf("m = %i ... %i", *m, *(m+(N-1)));
        usleep(100000);
        printf("\n");
    }
    shmdt(m);
}

int main(void) {
    pid_t pid;
    int segment_id;

    segment_id = shmget(IPC_PRIVATE, N*sizeof(int), IPC_CREAT | S_IRUSR | S_IWUSR);

    pid = fork();
    if(pid == 0) {
        producent(segment_id);
    } else {
        consumer(segment_id);
        waitpid(pid, NULL, 0);
    }
    return 0;
}

/*------------------------------------------------*/

Výstup z programu:

Producent start
m = 0 ... 0
m = 8 ... 7
m = 14 ... 13
m = 20 ... 19
m = 26 ... 25
m = 32 ... 31
m = 38 ... 37
m = 44 ... 43
m = 51 ... 50
m = 57 ... 56
m = 63 ... 62
Producent konec

Asi není těžké uhodnout, o co tu jde. Zatímco producent je někde v půlce svého cyklu, například začal zapisovat číslo 8, konzument načetl první a poslední číslo, přičemž poslední číslo je ještě pořád 7.

Konzument tak dostává nekonzistentní data. Představte si, že místo řady čísel zapisuje producent nějakou strukturu. Konzument by načetl strukturu, která by se skládala z části z jedné struktury a z části z jiné. A to je peklo!

Je potřeba nějak zajistit, aby se producent a konzument o přístup ke sdílené paměti střídali. A k tomu právě slouží semafory.

Pojmenované semafory

Semafor je sdílená struktura, která má atomické funkce pro své zapnutí a vypnutí. Atomicita znamená, že se nemůže stát, že by se dva procesy podívali v jeden okamžik na semafor, řekli si že je zelený, nastavili ho na červený a jali se pracovat se sdílenou pamětí.

Semafor funguje tak, že jej proces nastaví na „červenou“ (pro ostatní procesy), pracuje se sdílenou pamětí a pak jej zase nastaví na „zelenou“. (Tou červenou a zelenou se samo sebou myslí jedničky a nuly.)

Semafor, typu sem_t, se vytvoří funkcí sem_open():

sem_t *sem_open(const char *name, int oflag,
                       mode_t mode, unsigned int value);

Jméno name je jméno semaforu. Funguje podobně jako klíč pro sdílenou paměť, nebo spíš jméno pojmenované roury. Vlastně se pro takto vytvořený semafor skutečně vytvoří soubor stejného jména ve speciálním souborovém sytému /dev/shm.

Příznaky oflag jsou podobné, jako při vytváření sdílené paměti: O_CREAT i O_EXCL.

Příznaky v mode slouží k nastavení přístupových práv (podobně jako pro sdílenou paměť, nebo pro soubory otevírané funkcí open()).

Poslední příznak je hodnota semaforu. 1 znamená zelená, 0 znamená červená. Pokud otevřete už existující semafor, tato hodnota se ignoruje.

Semafor byste také měli po skončení práce uzavřít:

int sem_close(sem_t *sem);

No a teď to nejzásadnější. Pokud chcete pracovat se sdíleným prostředkem, pokusíte se nastavit na semaforu červenou funkcí sem_wait(). Pokud na semaforu už červená je, funkce se zablokuje, dokud jiný proces nenastaví zelenou. Pak teda funkce sem_wait() nastaví červenou a odblokuje se (skončí). Zelenou nastavíte funkcí sem_post().

int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);

Následující příklad je stejný jako předchozí příklad, jen jsem jej doplnil o semafory.

  1. /*------------------------------------------------*/
  2. /* 23processKomunikace/semafory2.c                */
  3.  
  4. #include <sys/types.h>
  5. #include <sys/shm.h>
  6. #include <sys/stat.h>
  7. #include <sys/types.h>
  8. #include <sys/wait.h>
  9. #include <unistd.h>
  10. #include <stdlib.h>
  11. #include <stdbool.h>
  12. #include <string.h>
  13. #include <stdio.h>
  14. #include <fcntl.h>
  15. #include <semaphore.h>
  16.  
  17.  
  18. #define N 1024*1024
  19. #define SEM_NAME "mujSemafor"
  20.  
  21. void producent(int segment_id) {
  22.     int *m;
  23.     sem_t *sem;
  24.     int i,j;
  25.     srand(0);
  26.     m = (int *) shmat(segment_id, NULL, 0);
  27.     sem = sem_open(SEM_NAME, O_CREAT, S_IWUSR | S_IRUSR, 1);
  28.     printf("Producent start\n");
  29.     for (i = 0; i <= 200; i++) {
  30.         sem_wait(sem);
  31.         for (j = 0; j < N; j++) {
  32.             memcpy(m+j, &i, sizeof(int));
  33.         }
  34.         sem_post(sem);
  35.         usleep(rand() %50);
  36.     }
  37.     printf("Producent konec\n");
  38.     sem_close(sem);
  39.     shmdt(m);
  40. }
  41.  
  42. void consumer(int segment_id) {
  43.     int *m;
  44.     sem_t *sem;
  45.     int i;
  46.     m = (int *) shmat(segment_id, NULL, 0);
  47.     sem = sem_open("mujSemafor", O_CREAT, S_IWUSR | S_IRUSR, 1);
  48.     for (i = 0; i <= 10; i++) {
  49.         sem_wait(sem);
  50.         printf("m = %i ... %i", *m, *(m+(N-1)));
  51.         sem_post(sem);
  52.         usleep(100000);
  53.         printf("\n");
  54.     }
  55.     sem_close(sem);
  56.     shmdt(m);
  57. }
  58.  
  59. int main(void) {
  60.     pid_t pid;
  61.     int segment_id;
  62.  
  63.     segment_id = shmget(IPC_PRIVATE, N*sizeof(int), IPC_CREAT | S_IRUSR | S_IWUSR);
  64.  
  65.     pid = fork();
  66.     if(pid == 0) {
  67.         producent(segment_id);
  68.     } else {
  69.         consumer(segment_id);
  70.         waitpid(pid, NULL, 0);
  71.         /* sem_unlink(SEM_NAME); */
  72.     }
  73.     return 0;
  74. }
  75.  
  76. /*------------------------------------------------*/

Všiměte si, jak semafor zíkávám těsně před prací se sdíleným prostředkem a zase jej co nejdříve uvolňuji. Neblokujte jiné procesy déle, než je to nutné.

Příklad musíte přeložit s knihovnou pthread (volba -lpthread).

Výstup z programu:

Producent start
m = 0 ... 0
m = 9 ... 9
m = 20 ... 20
m = 31 ... 31
m = 42 ... 42
m = 53 ... 53
m = 61 ... 61
m = 68 ... 68
m = 75 ... 75
m = 83 ... 83
m = 89 ... 89
Producent konec

Pokud byste chtěli, aby konzument přečetl každou řádku producenta, měli byste lepší použít roury. Nebo můžete použít tzv. conditional variables o kterých dám řeč až v souvislosti s vlákny.

Můžete se přesvědčit, že byl semafor skutečně vytvořen:

$ ls -l /dev/shm/ | egrep mujSemafor
-rw------- 1 petr users       16 27. srp 18.32 sem.mujSemafor

Semafor můžete normálně smazat příkazem rm, nebo funkcí sem_unlink() (v příkladu je její použití zakomentované).

int sem_unlink(const char *name);

Nepojmenované semafory

Je tu samozřejmě riziko, že si dva programátoři vymyslí stejný název semaforu a budou si lézt navzájem do práce. S tím toho moc nenaděláte, musíte tvořit jména tak, abyste toto riziko minimalizoval, například jako to dělá program Adobe Reader: ADBE_WritePrefs_petr (do jména semaforu vložil jméno uživatele).

Další možností je použít nepojmenované semafory, analogicky jako se používá IPC_PRIVATE u sdílené paměti. Nepojmenované semafory se ukládají ve sdílené paměti. Inicializují se funkcí sem_init() a ruší funkcí sem_destroy().

Za domácí úkol upravte příklad semafory2.c tak, aby vyhradil na začátku sdílené paměti místo pro nepojmenovaný semafor a ten pak použijte pro synchronizaci. Pozor! sem_init() můžete volat jen jednou. Udělejte to ještě před tím, než process zavolá fork().

Řešení najdete ve zdrojových souborech jako příklad semafory3.c. Použil jsem tam jednu sdílenou paměť jak pro semafor, tak pro sdílená data. Můžete si vytvořit pro semafor i data sdílenou paměť zvlášť, pokud chcete (asi by pak bylo řešení i čitelnější, jen trochu delší na psaní).

Komentář Hlášení chyby
Vytvořeno: 27.8.2014
Naposledy upraveno: 10.10.2014
Tato stánka používá ke svému běhu cookies, díky kterým je možné monitorovat, co tu provádíte (ne že bych to bez cookies nezvládl). Také vás tu bude špehovat google analytics. Jestli si myslíte, že je to problém, vypněte si cookies ve vašem prohlížeči, nebo odejděte a už se nevracejte :-). Prohlížením tohoto webu souhlasíte s používáním cookies. Dozvědět se více..