동기 처리 1-2
의사 각성
의사 각성의 정의는 다음과 같다.
의사 각성: 특정한 조건이 만족될 때까지 대기 중이어야 하는 프로세스가 해당 조건이 만족되지 않았음에도 실행 상태로 변경되는 것
의사 각성은 어떤 경우에 일어나는가?
다음 코드는 의사 각성을 일으키는 C 코드 예시이다.
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 시그널 핸들러: 시그널 번호 표시
void handler(int sig) { printf("recieved signal %d\n", sig); }
int main(int argc, char *argv[]) {
// 프로세스 ID 취득, 표시.
pid_t pid = getpid();
printf("pid: %d\n", pid);
// SIGUSR1 시그널 핸들러 등록
signal(SIGUSR1, handler);
// wait처리, 그러나 notify하는 스레드가 없음.
// 영원히 대기?
pthread_mutex_lock(&mutex);
if (pthread_cond_wait(&cond, &mutex) != 0) {
perror("pthread_cond_wait");
exit(EXIT_FAILURE);
}
printf("spurious wake up\n");
pthread_mutex_unlock(&mutex);
return 0;
}
이 코드는 영원히 대기해야할 것 같지만 SIGUSR1 시그널이 송신되고 프로그램이 종료될 수 있다.
이유는
pthread_mutex_lock(&mutex);
if (pthread_cond_wait(&cond, &mutex) != 0) {
perror("pthread_cond_wait");
exit(EXIT_FAILURE);
}
여기서 pthread_cond_wait()는 pthread_cond_signal()이나 pthread_cond_broadcast() 때문에만 돌아오는 함수가 아니기 때문이다.
POSIX 규격에서 조건 변수를 기다리던 스레드는 아무 이유 없이 깨어날 수 있다고 허용하고 있다.
Spurious wakeups from the _pthread_cond_timedwait_() or _pthread_cond_wait_() functions may occur.
리눅스에서는 내부적으로 futex라는 시스템 콜을 이용하는데 커널 버전 2.6.22 이전에서는 futex에 의해 의사 각성이 발생했지만 이후 버전에는 발생하지 않는다.
정확히는 커널이 2.6.22 이후 안정화되어 내부 race로 인한 spurious wakeup은 줄어든 게 사실이지만 POSIX 규격상 spurious wakeup의 가능성을 명시하고 있는 것이 표준이기 때문에 완전히 배제된 개념은 아니라고 보는 게 옳다고 본다.
시그널
시그널이란 특정한 사건이 발생했다고 프로세스에게 알려주기 위해 UNIX 시스템에서 사용하는 알림 매커니즘이다. 일반적으로 시그널과 멀티스레드는 궁합이 맞지 않다고 알려져 있는데, 어떤 타이밍에서 시그널 핸들러가 호출되는지 알 수 없기 때문이다.
아래 코드는 시그널 핸들러를 사용할 때 데드락이 발생하는 전형적인 시그널 락 데드락 문제이다.
void handler(int sig) {
pthread_mutex_lock(&mutex); // 데드락
// 무언가 처리하기
pthread_mutex_unlock(&mutex);
}
int main(int argc, char *argv[]) {
pthread_mutex_lock(&mutex);
// 이 시점에서 시그널 발생
// handler()진입
// handler 안에서 또 다시 pthread_mutex_lock(&mutex)
pthread_mutex_unlock(&mutex);
return 0;
}
위 코드는 어떤 스레드가 뮤텍스를 가진 상태에서 다시 락을 요청하게 되어 블록되는 상황이다.
위와 같은 상태에 빠지는 것을 방지하기 위해 시그널을 수신하는 전용 스레드를 이용할 수 있다.
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
sigset_t set;
void *handler(void *arg) {
pthread_detach(pthread_self());
int sig;
for (;;) {
if (sigwait(&set, &sig) != 0) {
perror("sigwait");
exit(1);
}
printf("Received signal %d\n", sig);
pthread_mutex_lock(&mutex);
// Simulate some work
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void *worker(void *arg) {
for (int i = 0; i < 10; i++) {
pthread_mutex_lock(&mutex);
// Simulate some work
sleep(1);
pthread_mutex_unlock(&mutex);
sleep(1);
}
return NULL;
}
int main(int argc, char *argv[]) {
pid_t pid = getpid();
printf("pid: %d\n", pid);
// SIGSUR1 시그널을 블록으로 설정
sigemptyset(&set);
sigaddset(&set, SIGUSR1);
if (pthread_sigmask(SIG_BLOCK, &set, NULL) != 0) {
perror("pthread_sigmask");
exit(1);
}
pthread_t th, wth;
pthread_create(&th, NULL, handler, NULL);
pthread_create(&wth, NULL, worker, NULL);
pthread_join(wth, NULL);
return 0;
}
중요한 것은 pthread_sigmask 함수에서 시그널을 블록하는 것과 시그널 수신용 스레드를 준비해 sigwait함수에서 동기로 시그널을 수신하는 것이다. 이러면 어떤 타이밍에 시그널이 발생해도 데드락 상태가 되지 않는다.
어떤 시그널을 블록으로 설정하면 해당 시그널이 프로세스에 송신되어도 시그널 핸들러가 실행되지 않는다.
러스트에서는 signal_hook이라는 크레이트가 있으며 시그널을 다룰 때는 이 크레이트를 사용할 것을 권장한다.
use libc::SIGUSR1;
use signal_hook::iterator::Signals;
use std::sync::{Arc, Mutex};
use std::{error::Error, process, thread, time::Duration};
fn main() -> Result<(), Box<dyn Error>> {
println!("pid: {}", process::id());
// 수신 대상 시그널인 SIGUSR1지정하여 SIGNALS 타입 생성
let signals = Signals::new([SIGUSR1])?;
let signals = Arc::new(Mutex::new(signals));
let worker_signals = Arc::clone(&signals);
let handle = thread::spawn(move || {
let mut signals = worker_signals.lock().unwrap();
for sig in signals.forever() {
println!("received signal: {:?}", sig);
}
});
thread::sleep(Duration::from_secs(10));
// 종료 직전 스레드 join
handle.join().ok();
Ok(())
}
위와 같이 시그널 수신 스레드를 작성할 수 있다. 시그널을 수신한 것을 여러 스레드에 알리고 싶을 때는 crossbeam-channel 크레이트를 이용할 수 있다.
메모리 배리어
현대적인 CPU에서는 반드시 기계어 명령 순에 따라서만 처리를 수행하지는 않는다. 이러한 실행을 아웃 오브 오더 실행이라고 한다. 아웃 오브 오더 실행을 하는 이유는 파이프라인 처리 시 단위 시간당 실행 명령 수(instructions-per-second, IPC) 를 높이기 위해서이다.
예를 들면 메모리상에만 존재하는 주소 A와 캐시 라인상에 존재하는 주소 B가 있고 read A, read B순으로 기계어가 있을 때, 반드시 A->B순으로 읽는 것이 효율적이지는 않다. 이러한 경우에 아웃 오브 오더 실행은 IPC 향상에 기여할 수 있다.
아웃 오브 오더 실행은 IPC 향상에 도움이 되기도 하지만 몇 가지 문제를 발생시킬 수 있다. 이 파트의 제목이기도 한 메모리 배리어(메모리 펜스) 는 아웃 오브 오더 실행에 관한 여러 문제에서 시스템을 보호하기 위한 처리이다.

위 그림에서, 프로세스 A와 B는 공유 변수에 접근하기 위해 락을 획득하고 락 획득 중에 공유 변수를 증가한다고 가정한다. 그러나 A의 write(v+1)작업이 끝나기 이전 B가 이를 읽어오기 때문에 최종 결괏값으로 2를 기대했으나 1이 도출되는 상황이 된다. 그러나 실제로는 메모리 배리어 명령에 의해 메모리 읽기 쓰기 순서가 보증되기 때문에 위와 같은 일은 벌어지지 않는다.
AArch64의 메모리 배리어 명령 중 일부를 소개하자면 아래 사진과 같다.

이 중 dmb는
dmb:dmb sy: 이 명령어를 기준으로 앞의 모든 load/store가 완료되기 전에는 이후의 load/store가 시작되지 않음dmb st: 이 명령 이전의 store과 이후의 store간의 순서만 보장하고, load는 관여하지 않음.dmb ld: 이 명령 이전의 load와 이후의 load간의 순서만 보장.
그림으로는

와 같다.
메모리 오더링(memory ordering) 은 메모리 읽기 쓰기를 수행하는 순서를 말하고 기계어 순서와 다르게 실행되는 것을 리오더링이라 부른다. 리오더링이 발생하는 명령 순서는 읽기 쓰기 순서나 CPU 아키텍처에 따라 달라진다.

러스트 아토믹
러스트 언어에도 아토믹 변수를 다루는 라이브러리인 std::sync::atomic이 존재한다. 러스트에서는 아토믹 변수를 읽고 쓸 떄 메모리 배리어의 방법을 지정해야 하고 이 때 Ordering타입을 이용한다.
러스트에서의 Ordering은 아래 표와 같다.
| Ordering | Load 이후 연산 순서 | Store 이전 연산 순서 | 설명 | 사용 예 |
Relaxed | ❌ | ❌ | 순서 보장 없음 (원자성만 보장됨) | 단순 카운터, 통계 수집 |
Acquire | O | ❌ | Load 이후 연산 순서 보장 | 락 취득(load-acquire) |
Release | ❌ | O | Store 이전 연산 순서 보장 | 락 해제(store-release) |
AcqRel | O | O | Acquire + Release 결합(load, store) | CAS, 교환 연산 |
SeqCst | O (전역적으로) | O (전역적으로) | 전역 순서까지 보장하는 가장 강한 모델 | 대부분의 락 없는 동기화 |
러스트에서 뮤텍스는
보호 대상 데이터에는 락 후에만 접근 가능
락 해제는 자동으로 되는 특성이 있었다.
따라서 러스트의 아토믹 변수를 이용한 스핀락을 구현하면 아래와 같다.
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
const NUM_THREADS: usize = 4;
const NUM_LOOP: usize = 100_000;
// 스핀락 구조체 정의
struct SpinLock<T> {
locked: AtomicBool,
data: UnsafeCell<T>,
}
// 락 가드 구조체
struct SpinLockGuard<'a, T> {
spin_lock: &'a SpinLock<T>,
}
impl<T> SpinLock<T> {
fn new(data: T) -> Self {
SpinLock {
locked: AtomicBool::new(false),
data: UnsafeCell::new(data),
}
}
// 락 함수
fn lock(&self) -> SpinLockGuard<T> {
loop {
// 빠른 경합 회피용 빠른 스핀
while self.locked.load(Ordering::Relaxed) {
std::hint::spin_loop(); // CPU friendly spin
}
if self
.locked
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
break;
}
}
SpinLockGuard { spin_lock: self }
}
fn unlock(&self) {
self.locked.store(false, Ordering::Release);
}
}
// unsafe marker: SpinLock<T>는 T가 Send일 때만 Send/Sync 가능
unsafe impl<T: Send> Send for SpinLock<T> {}
unsafe impl<T: Send> Sync for SpinLock<T> {}
// SpinLockGuard는 Deref/DerefMut 제공
impl<'a, T> Deref for SpinLockGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.spin_lock.data.get() }
}
}
impl<'a, T> DerefMut for SpinLockGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.spin_lock.data.get() }
}
}
// Drop 구현으로 unlock
impl<'a, T> Drop for SpinLockGuard<'a, T> {
fn drop(&mut self) {
self.spin_lock.unlock();
}
}
fn main() {
let lock = Arc::new(SpinLock::new(0));
let mut handles = Vec::new();
for _ in 0..NUM_THREADS {
let lock_clone = Arc::clone(&lock);
let handle = std::thread::spawn(move || {
for _ in 0..NUM_LOOP {
let mut data = lock_clone.lock();
*data += 1;
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let result = lock.lock();
println!("Final result: {}", *result);
}
UnsafeCell 사용 없이 구현하면 위와 같다.

