비동기 프로그래밍

비동기 프로그래밍이란?
비동기 프로그래밍이란 무엇인가? 작성한 순서대로 작동하는 프로그래밍 모델을 동기 프로그래밍(synchronous programming) 이라고 부른다.
반대로 비동기 프로그래밍은 작성한 순서대로만 작동하는 것은 아님을 의미하게 되겠다. 정확히는 독립해서 발생하는 이벤트에 대한 처리를 기술하기 위한 동시성 프로그래밍 기법을 총칭해서 비동기 프로그래밍이라고 한다. 비동기 프로그래밍을 이용하면 전화가 울리면 전화를 받는 것과 같이 이벤트에 대응한 작동을 구현할 수 있다.
비동기 프로그램을 구현하는 방법으로 콜백 함수나 시그널(인터럽트)을 이용하는 방법이 있으나 여기서는 Future, async/await를 설명한다. 러스트에서는 async/await을 이용한 비동기 라이브러리로 Tokio, smol이 있다. 또한 nix와 future크레이트를 이용할 것이다. smol github
동시 서버
여기서는 반복 서버, 동시 서버를 알아보고 그 구현을 설명한다.
반복 서버(interactive server): 클라이언트로 요청받은 순서대로 처리하는 서버 동시 서버(concurrent server): 요청을 동시에 처리하는 서버이다.
반복 서버
다음은 단순한 반복 서버의 예시 코드다.
use ::std::io::{BufRead, BufReader, BufWriter, Read};
use std::net::TcpListener;
fn main() {
// TCP 8000번 포트 리스닝
let listener = TcpListener::bind("127.0.0.1:8000").unwrap();
// 커넥션 요청을 받아들이기
while let Ok((stream, _)) = listener.accept() {
// 읽기, 쓰기 객체 생성
let stream0 = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream0);
let mut writer = BufWriter::new(stream);
// 클라이언트로부터 한 줄 읽기
let mut buf = String::new();
reader.read_line(&mut buf).unwrap();
writer.write_all(buf.as_bytes()).unwrap();
writer.flush().unwrap();
}
}
위 코드에서는 커넥션 요청을 받아 클라이언트로부터 데이터를 수신하고 송신 처리를 완료하지 않으면 다음 클라이언트의 처리를 수행할 수 없다.
동시 서버
동시 서버는 클라이언트로부터의 커넥션 요청, 데이터 도착 등의 처리를 이벤트 단위로 세세히 분류하여 이벤트에 따라 처리할 수 있다. 네트워크 소켓이나 파일 등 IO이벤트 감시에는 리눅스에서 epoll BSD 계열에서는 kqueue라는 시스템 콜을 이용할 수 있다.
IO 이벤트 감시는 파일 디스크립터를 감시하는 것이다. 여러 TCP 커넥션이 존재할 때 서버는 여러 개의 파일 디스크립터를 가지는데, 이 디스크립터들에 대해 읽기 쓰기 가능 여부를 epoll 등을 이용해 판정할 수 있다.

그림에서는 프로세스 A가 0~4까지의 파일 디스크립터를 이용하는데, 커널 내부의 프로세스와 디스크립터 정보가 저장되어 있으므로 이 정보들을 이용해 epoll등을 통한 파일 디스크립터 감시를 수행한다.
다음 코드는 epoll을 이용한 병렬 서버의 예시 코드이다. 다만 논블로킹 설정은 수행되지 않았다.
use nix::sys::epoll::{
EpollCreateFlags, EpollEvent, EpollFlags, EpollOp, epoll_create1, epoll_ctl, epoll_wait,
};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::TcpListener;
use std::os::unix::io::{AsRawFd, RawFd};
fn main() {
// epoll을 이용해 여러 연결을 동시에 감시하기 위해 준비
let epoll_in = EpollFlags::EPOLLIN;
let epoll_add = EpollOp::EpollCtlAdd;
let epoll_del = EpollOp::EpollCtlDel;
// 서버 소켓 생성
let listener = TcpListener::bind("127.0.0.1:8081").unwrap();
// epoll 인스턴스 생성
let epfd = epoll_create1(EpollCreateFlags::empty()).unwrap();
let listen_fd = listener.as_raw_fd();
// 서버 소켓을 epoll에 등록: 새로운 연결이 들어올 때 알려달라고 커널에 요청
let mut ev = EpollEvent::new(epoll_in, listen_fd as u64);
epoll_ctl(epfd, epoll_add, listen_fd, &mut ev).unwrap();
// 동시성: 클라이언트별 버퍼를 HashMap에 저장
let mut fd2buf = HashMap::new();
let mut events = vec![EpollEvent::empty(); 1024];
// 단일 스레드에서 다수 연결을 epoll_wait로 동시에 처리
while let Ok(nfds) = epoll_wait(epfd, &mut events, -1) {
for n in 0..nfds {
if events[n].data() == listen_fd as u64 {
// 새 클라이언트 접속
if let Ok((stream, _)) = listener.accept() {
let fd = stream.as_raw_fd();
// 각각의 클라이언트 연결을 독립적으로 관리
let stream0 = stream.try_clone().unwrap();
let reader = BufReader::new(stream0);
let writer = BufWriter::new(stream);
fd2buf.insert(fd, (reader, writer));
println!("accept: fd = {}", fd);
// 새 클라이언트 소켓을 epoll에 등록: 이후부터 이 소켓 읽기 이벤트 감시
let mut ev = EpollEvent::new(epoll_in, fd as u64);
epoll_ctl(epfd, epoll_add, fd, &mut ev).unwrap();
}
} else {
// 기존 클라이언트로부터 데이터 도착
let fd = events[n].data() as RawFd;
let (reader, writer) = fd2buf.get_mut(&fd).unwrap();
let mut buf = String::new();
let n = reader.read_line(&mut buf).unwrap();
if n == 0 {
// 클라이언트가 연결을 종료한 경우
let _ev = EpollEvent::new(EpollFlags::empty(), fd as u64);
epoll_ctl(epfd, epoll_del, fd, None).unwrap();
fd2buf.remove(&fd);
println!("close: fd = {}", fd);
continue;
}
println!("read: fd = {}, buf = {}", fd, buf.trim());
// 읽은 데이터를 다시 클라이언트로 전송
writer.write_all(buf.as_bytes()).unwrap();
writer.flush().unwrap();
}
}
}
}
이렇게 epoll 등을 사용해 여러 IO에 대해 동시에 처리를 수행하는 방법을 IO 다중화(IO multiplexing) 이라고 부른다. IO다중화의 방법론의 하나로 위 코드에서처럼 이벤트에 대해 처리를 기술하는 프로그래밍 모델, 디자인 패턴을 이벤트 주도(event-driven) 이라고 한다.

