[회고] 신입 iOS 개발자가 되기까지 feat. 카카오 자세히보기

💻 CS/운영체제

[운영체제] 생산자, 소비자 쓰레드 / 원소적 실행

inu 2020. 5. 5. 15:51
반응형

생산자/소비자 쓰레드

#include <stdio.h>
#include <pthread.h>
void consumer (void);
char buffer[n];
int n, in = 0, out = 0;
int main ()
{
    char nextp; int i;
    pthread_t tid;
    pthread_create (&tid, NULL, consumer,NULL);
    for (i = 0; i < 500; i++) {
        produce an item in nextp
        while ((in+1) % n == out) ; // 대기 loop
        buffer[in] = nextp;
        in++; in %= n;
    }
    pthread_join (tid);
}

void consumer(void)
{
    char nextc;
    for (i = 0; i < 500; i++) {
        while (in == out) ;
        nextc = buff[out];
        out++; out %= n;
        ... 
        consume the item in nextc;
    }
}
  • 결정성이 있는 프로그래밍을 할 수 있는 방법 : 쓰레드 혹은 프로세스 간의 선행관계와 독립적 관계를 명시하고 독립적 관계는 공유변수에 의한 간섭문제 해결해야 한다.
  • 위 코드는 일전에 쓰레드 파트에서 학습했던 producer, consumer 코드이다.
  • 해당 코드에서 buffer는 셀이 n개이다. 하지만 n개를 모두 활용할 수 없고 n-1개만 사용한다. (셀 하나는 채울 수 없다.)
  • 생산자 측에서 입력 후 in++를 하며, 소비측에선 in==out을 empty로 간주하므로 in < out -1 이어야 full과 empty를 구분 할 수 있다.
  • 즉, in과 out이 같은 경우를 empty로 처리하고 있기 때문에 마지막 부분까지 처리하면 이것이 full인지 empty인지 구분이 어려워진다.
// 생산자
while (1) { 
    …
    produce an item in nextp 
    …
    while ( counter == n) ;
    //wait for an empty buffer
    buffer[in] = nextp;
    in = (in + 1) % n ;
    counter = counter + 1;
}
// 소비자
while (1) {
    while (counter == 0) ;
    // wait for an item
    nextc = buffer[out] ;
    out = (out + 1) % n ;
    counter = counter -1; 
    …
    consume the item in nextc 
    …
}
  • n개의 셀 모두를 사용하고 싶으면 버퍼 안의 데이터 갯수를 세서 full인지 empty인지 판단하면 된다.
  • 다만 이렇게 될 경우 개수를 다루기 위한 counter 변수가 필요해진다.
  • 생산자에선 counter를 늘려주고 소비자에서는 줄여준다.

  • 다만 생산자와 소비자에서 동시에 counter 변수에 접근하면 이처럼 기계어 프로세스가 섞이면서 값이 이상해질 수 있다.
  • T0: 생산자가 register1 = counter 수행 {register1 = 5}
  • T1: 생산자가 register1 = register1+1 수행 {register1 = 6}
  • T2: 소비자가 register2 = counter 수행 {register2 = 5}
  • T3: 소비자가 register2 = register2-1 수행 {register2 = 4}
  • T4: 생산자가 counter = register1 수행 {counter = 6}
  • T5: 소비자가 counter = register2 수행 {counter = 4}

원소적(Atomic) 실행

  • 두 쓰레드가 동시에 같은 변수에 접근하면 간섭이 일어나 올바르게 값이 처리되지 않을 위험이 있다.
  • 이렇게 문맥 교환이 일어나도, 서로 간섭이 없도록 실행시키는 방법이 원소적(atomic) 실행이다.
  • 즉, 원소적 실행을 사용하면 공유변수를 통해 상호작용하는 프로세스 혹은 쓰레드 간 문맥교환이 언제 일어나도 간섭없는 실행을 보장하게 되는 것이다.
  • 이를 실현하기 위해서는 임계구역이라는 것을 설정해야 한다. 한 쓰레드가 먼저 임계구역에 진입해 실행 중 문맥교환이 발생되어 다른 쓰레드에 선점을 시도하면 그를 불허하고 대기하도록 하는 것이다. 공유변수를 사용하는 영역에 이런 임계구역을 설정해놓으면 다른 프로세스 혹은 쓰레드와의 충돌로 값이 꼬일 일은 없다.
  • 임계구역 : atomic실행을 위하여 각 프로세스 혹은 쓰래드가 공유변수, 자료구조, 파일 등을 배타적으로 읽고 쓸 수 있도록 설정한 코드의 세그먼트
  • entry section 또는 enter_mutex로 임계구역을 시작하고 exit section 또는 exit_mutex로 임계구역을 종료한다.
  • 이렇듯 임계구역을 설정하여 진입과 진출을 순서화 시키는 것이 동기화라고 한다.
  • // 생산자 while (1) { … produce an item in nextp … while ( counter == n) ; //wait for an empty buffer buffer[in] = nextp; in = (in + 1) % n ; Enter_Mutex(lock); counter = counter + 1; Exit_Mutex(lock); } // 소비자 while (1) { while (counter == 0) ; // wait for an item nextc = buffer[out] ; out = (out + 1) % n ; Enter_Mutex(lock); counter = counter -1; Exit_Mutex(lock); … consume the item in nextc … }
  • 생산자, 소비자 코드에서 위와 같은 코드를 추가함으로서 임계구역 설정할 수 있다.
  • mutex란 mutual exclusion, 즉 상호배제의 의미이다.

pthread_cond_wait() & pthread_cond_signal()

#include <stdio.h>
#include <pthread.h>
//shared variables
int buffer[100];
int count = 0;
int in = -1, out = -1;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t buffer_has_space = PTHREAD_COND_INITIALIZER;
pthread_cond_t buffer_has_data = PTHREAD_COND_INITIALIZER;

int main(void)
{
    int i;
    pthread_t threads[2];
    pthread_create(&threads[0], NULL, producer, NULL);
    pthread_create(&threads[1], NULL, consumer, NULL);
    for (i=0; I < 2; I++)
        pthread_join(threads[i], NULL);
    return 0;
}
  • pthread_cond_wait() : 쓰레드가 휴면상태(대기상태)가 되도록 하며, 휴면상태는pthread_cond_signal()이라는 함수를 통해 깨어남
  • 첫번째 인자: 조건변수(condition variable) - pthread_cond_signal()의 인자로 사용되어야 한다.
  • 두번째 인자: 잠겨진 mutex
  • pthread_cond_t : 버퍼가 full 혹은 empty일 때 해당 상태가 풀릴 때까지 대기하기 위해 필요한 변수
  • buffer_has_space : 버퍼에 공간이 하나는 있다는 의미
  • buffer_has_data : 버퍼에 데이터가 하나는 있다는 의미
void producer (void * arg)
{
    int i;
    for (i =0; i < 1000; i++) {
        pthread_mutex_lock(&mutex);
        if (count == 100)
            pthread_cond_wait(&buffer_has_space, &mutex);
        in++; 
        in %= 100;
        buffer[in] = I;
        count++;
        pthread_cond_signal(&buffer_has_data);
        pthread_mutex_unlock(&mutex);
        printf(“data in: %d\n”, i);
    }
}
  • 버퍼의 내용이 100으로 full이면 생산자에서 더 이상 데이터를 넣을 수 없다.
  • 그런데 이미 mutex_lock을 해놔서 소비자 쪽에서는 버퍼에서 데이터를 가져갈 수 없다.
  • pthread_cond_wait(&buffer_has_space, &mutex)를 활용해서 buffer_has_space 시그널이 올 때까지만 락을 잠시 해제한다.
  • 그럼 소비자가 다시 임계구역 안으로 들어갈 수 있게 되어서 버퍼에서 데이터를 가져갈 수 있게 된다.
  • 데이터를 가져간 순간 소비자는 pthread_cond_signal(&buffer_has_space)을 보내 생산자가 다시 mutex_lock을 하도록 한다. 그리고 생산자는 진행을 속개한다.
void consumer (void *arg)
{
    int i,data;
    for (i =0; i < 1000; i++) {
        pthread_mutex_lock(&mutex);
        if (count == 0)
            pthread_cond_wait(&buffer_has_data,&mutex);
        out++; out %= 100;
        data = buffer[out];
        count--;
        pthread_cond_signal(&buffer_has_space);
        pthread_mutex_unlock(&mutex);
        printf("data = %d\n",data);
    }
}
  • 이 역시 생산자의 경우와 유사하게 돌아간다.
  • 버퍼가 empty일 경우 pthread_cond_wait를 활용해 락을 해제한다.
  • 생산자에서 signal을 보내오면 다시 락을 걸고 작업을 속개한다.

mutex 활용 예제 주석

#include<stdio.h>
#include<string.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>

#define THREAD_NUM      3

pthread_t tid[THREAD_NUM];
int counter=0;
pthread_mutex_t lock; // 뮤텍스 변수, 뮤텍스를 사용할 때마다 선언

void* doSomeThing(void *arg) // 쓰레드로 불려질 함수
{
    char *t_name = (char *) arg;

    pthread_mutex_lock(&lock);

    printf("\n%s started!  counter=%d\n", t_name, ++counter);
    // counter는 공유변수

    srand((unsigned long) time(NULL));
    unsigned long max_iter = 0xFFFFFFFF - (rand() % 0xFFFFFFFF);

    for(unsigned long i=0; i<max_iter;i++);
    // 일정 시간 소요

    printf("\n%s finished! counter=%d\n", t_name, --counter);

    pthread_mutex_unlock(&lock);

    return NULL;
}

int main(void)
{
    int i = 0;
    int err;
    char strbuf[THREAD_NUM][8];

    if (pthread_mutex_init(&lock, NULL) != 0) // 뮤텍스 변수 초기화
    {
        printf("\n mutex init failed\n");
        return 1;
    }

    for(i=0; i < THREAD_NUM; i++)
    {
        sprintf(strbuf[i], "[T%d]", i); // 쓰레드 이름 설정
        err = pthread_create(&(tid[i]), NULL, &doSomeThing, strbuf[i]);
        if (err != 0)
            printf("\ncan't create thread :[%s]", strerror(err));
    }

    for(i=0; i< THREAD_NUM; i++)
        pthread_join(tid[i], NULL);

    printf("\n");

    return 0;
}
반응형