#include <stdio.h>
#include <pthread.h>
void consumer (void);
char buffer[n];
int n, in = 0, out = 0;
int main ()
{
char nextp; int i;
pthread_t tid;
pthread_create (&tid, NULL, consumer,NULL);
for (i = 0; i < 500; i++) {
produce an item in nextp
while ((in+1) % n == out) ; // 대기 loop
buffer[in] = nextp;
in++; in %= n;
}
pthread_join (tid);
}
void consumer(void)
{
char nextc;
for (i = 0; i < 500; i++) {
while (in == out) ;
nextc = buff[out];
out++; out %= n;
...
consume the item in nextc;
}
}
결정성이 있는 프로그래밍을 할 수 있는 방법 : 쓰레드 혹은 프로세스 간의 선행관계와 독립적 관계를 명시하고 독립적 관계는 공유변수에 의한 간섭문제 해결해야 한다.
위 코드는 일전에 쓰레드 파트에서 학습했던 producer, consumer 코드이다.
해당 코드에서 buffer는 셀이 n개이다. 하지만 n개를 모두 활용할 수 없고 n-1개만 사용한다. (셀 하나는 채울 수 없다.)
생산자 측에서 입력 후 in++를 하며, 소비측에선 in==out을 empty로 간주하므로 in < out -1 이어야 full과 empty를 구분 할 수 있다.
즉, in과 out이 같은 경우를 empty로 처리하고 있기 때문에 마지막 부분까지 처리하면 이것이 full인지 empty인지 구분이 어려워진다.
// 생산자
while (1) {
…
produce an item in nextp
…
while ( counter == n) ;
//wait for an empty buffer
buffer[in] = nextp;
in = (in + 1) % n ;
counter = counter + 1;
}
// 소비자
while (1) {
while (counter == 0) ;
// wait for an item
nextc = buffer[out] ;
out = (out + 1) % n ;
counter = counter -1;
…
consume the item in nextc
…
}
n개의 셀 모두를 사용하고 싶으면 버퍼 안의 데이터 갯수를 세서 full인지 empty인지 판단하면 된다.
다만 이렇게 될 경우 개수를 다루기 위한 counter 변수가 필요해진다.
생산자에선 counter를 늘려주고 소비자에서는 줄여준다.
다만 생산자와 소비자에서 동시에 counter 변수에 접근하면 이처럼 기계어 프로세스가 섞이면서 값이 이상해질 수 있다.
T0: 생산자가 register1 = counter 수행 {register1 = 5}
T1: 생산자가 register1 = register1+1 수행 {register1 = 6}
T2: 소비자가 register2 = counter 수행 {register2 = 5}
T3: 소비자가 register2 = register2-1 수행 {register2 = 4}
T4: 생산자가 counter = register1 수행 {counter = 6}
T5: 소비자가 counter = register2 수행 {counter = 4}
원소적(Atomic) 실행
두 쓰레드가 동시에 같은 변수에 접근하면 간섭이 일어나 올바르게 값이 처리되지 않을 위험이 있다.
이렇게 문맥 교환이 일어나도, 서로 간섭이 없도록 실행시키는 방법이 원소적(atomic) 실행이다.
즉, 원소적 실행을 사용하면 공유변수를 통해 상호작용하는 프로세스 혹은 쓰레드 간 문맥교환이 언제 일어나도 간섭없는 실행을 보장하게 되는 것이다.
이를 실현하기 위해서는 임계구역이라는 것을 설정해야 한다. 한 쓰레드가 먼저 임계구역에 진입해 실행 중 문맥교환이 발생되어 다른 쓰레드에 선점을 시도하면 그를 불허하고 대기하도록 하는 것이다. 공유변수를 사용하는 영역에 이런 임계구역을 설정해놓으면 다른 프로세스 혹은 쓰레드와의 충돌로 값이 꼬일 일은 없다.
임계구역 : atomic실행을 위하여 각 프로세스 혹은 쓰래드가 공유변수, 자료구조, 파일 등을 배타적으로 읽고 쓸 수 있도록 설정한 코드의 세그먼트
entry section 또는 enter_mutex로 임계구역을 시작하고 exit section 또는 exit_mutex로 임계구역을 종료한다.
이렇듯 임계구역을 설정하여 진입과 진출을 순서화 시키는 것이 동기화라고 한다.
// 생산자
while (1) {
…
produce an item in nextp
…
while ( counter == n) ;
//wait for an empty buffer
buffer[in] = nextp;
in = (in + 1) % n ;
Enter_Mutex(lock);
counter = counter + 1;
Exit_Mutex(lock);
}
// 소비자
while (1) {
while (counter == 0) ;
// wait for an item
nextc = buffer[out] ;
out = (out + 1) % n ;
Enter_Mutex(lock);
counter = counter -1;
Exit_Mutex(lock);
…
consume the item in nextc
…
}
생산자, 소비자 코드에서 위와 같은 코드를 추가함으로서 임계구역 설정할 수 있다.
mutex란 mutual exclusion, 즉 상호배제의 의미이다.
pthread_cond_wait() & pthread_cond_signal()
#include <stdio.h>
#include <pthread.h>
//shared variables
int buffer[100];
int count = 0;
int in = -1, out = -1;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t buffer_has_space = PTHREAD_COND_INITIALIZER;
pthread_cond_t buffer_has_data = PTHREAD_COND_INITIALIZER;
int main(void)
{
int i;
pthread_t threads[2];
pthread_create(&threads[0], NULL, producer, NULL);
pthread_create(&threads[1], NULL, consumer, NULL);
for (i=0; I < 2; I++)
pthread_join(threads[i], NULL);
return 0;
}
pthread_cond_wait() : 쓰레드가 휴면상태(대기상태)가 되도록 하며, 휴면상태는pthread_cond_signal()이라는 함수를 통해 깨어남
첫번째 인자: 조건변수(condition variable) - pthread_cond_signal()의 인자로 사용되어야 한다.
두번째 인자: 잠겨진 mutex
pthread_cond_t : 버퍼가 full 혹은 empty일 때 해당 상태가 풀릴 때까지 대기하기 위해 필요한 변수
buffer_has_space : 버퍼에 공간이 하나는 있다는 의미
buffer_has_data : 버퍼에 데이터가 하나는 있다는 의미
void producer (void * arg)
{
int i;
for (i =0; i < 1000; i++) {
pthread_mutex_lock(&mutex);
if (count == 100)
pthread_cond_wait(&buffer_has_space, &mutex);
in++;
in %= 100;
buffer[in] = I;
count++;
pthread_cond_signal(&buffer_has_data);
pthread_mutex_unlock(&mutex);
printf(“data in: %d\n”, i);
}
}
버퍼의 내용이 100으로 full이면 생산자에서 더 이상 데이터를 넣을 수 없다.
그런데 이미 mutex_lock을 해놔서 소비자 쪽에서는 버퍼에서 데이터를 가져갈 수 없다.
pthread_cond_wait(&buffer_has_space, &mutex)를 활용해서 buffer_has_space 시그널이 올 때까지만 락을 잠시 해제한다.
그럼 소비자가 다시 임계구역 안으로 들어갈 수 있게 되어서 버퍼에서 데이터를 가져갈 수 있게 된다.
데이터를 가져간 순간 소비자는 pthread_cond_signal(&buffer_has_space)을 보내 생산자가 다시 mutex_lock을 하도록 한다. 그리고 생산자는 진행을 속개한다.
void consumer (void *arg)
{
int i,data;
for (i =0; i < 1000; i++) {
pthread_mutex_lock(&mutex);
if (count == 0)
pthread_cond_wait(&buffer_has_data,&mutex);
out++; out %= 100;
data = buffer[out];
count--;
pthread_cond_signal(&buffer_has_space);
pthread_mutex_unlock(&mutex);
printf("data = %d\n",data);
}
}
이 역시 생산자의 경우와 유사하게 돌아간다.
버퍼가 empty일 경우 pthread_cond_wait를 활용해 락을 해제한다.
생산자에서 signal을 보내오면 다시 락을 걸고 작업을 속개한다.
mutex 활용 예제 주석
#include<stdio.h>
#include<string.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>
#define THREAD_NUM 3
pthread_t tid[THREAD_NUM];
int counter=0;
pthread_mutex_t lock; // 뮤텍스 변수, 뮤텍스를 사용할 때마다 선언
void* doSomeThing(void *arg) // 쓰레드로 불려질 함수
{
char *t_name = (char *) arg;
pthread_mutex_lock(&lock);
printf("\n%s started! counter=%d\n", t_name, ++counter);
// counter는 공유변수
srand((unsigned long) time(NULL));
unsigned long max_iter = 0xFFFFFFFF - (rand() % 0xFFFFFFFF);
for(unsigned long i=0; i<max_iter;i++);
// 일정 시간 소요
printf("\n%s finished! counter=%d\n", t_name, --counter);
pthread_mutex_unlock(&lock);
return NULL;
}
int main(void)
{
int i = 0;
int err;
char strbuf[THREAD_NUM][8];
if (pthread_mutex_init(&lock, NULL) != 0) // 뮤텍스 변수 초기화
{
printf("\n mutex init failed\n");
return 1;
}
for(i=0; i < THREAD_NUM; i++)
{
sprintf(strbuf[i], "[T%d]", i); // 쓰레드 이름 설정
err = pthread_create(&(tid[i]), NULL, &doSomeThing, strbuf[i]);
if (err != 0)
printf("\ncan't create thread :[%s]", strerror(err));
}
for(i=0; i< THREAD_NUM; i++)
pthread_join(tid[i], NULL);
printf("\n");
return 0;
}