ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [I/O multiplexing] select vs kqueue
    NetWork 2024. 1. 7. 23:59

      I/O multiplexing이 가능한 채팅 프로그램을 고도화한 경험을 공유합니다. I/O multiplexing 구현을 위한 system call을 O(n)의 시간복잡도를 가지는 'select'에서 시간복잡도 O(1)의 'kqueue'로 변경했습니다. 본 포스팅의 목차는 아래와 같습니다.

     

    1. I/O multiplexing 개념 
      - multiplexing의 정의 
      - I/O model의 종류
      - I/O multiplexing의 종류

    2. 코드로 확인하는 select vs kqueue

    3. trouble shooting: 표준입력은 READ_EVENT 이다!

     

     

    1. I/O multiplexing 개념

      본격적으로 select와 kqueue를 비교하기 전, 'I/O multiplexing 채팅 프로그램'의 배경이 되는 몇 가지 개념을 톺아보겠습니다.

     

    1-1) multiplexing의 정의 

      우선 multiplexing(다중화)이란, '한 프로세스가 여러 파일을 관리'하는 기법입니다. 이를 채팅 프로그램에 적용시켜보면 하나의 전송로를 분할하여 독립된 신호를 개별적으로 동시에 송수신할 수 있는 다수의 채널을 구성하게 됩니다. 즉, 하나의 서버에서 여러 socket(file)을 관리하여 여러 클라이언트가 동시에 접속할 수 있도록 하는 것이죠.

     

    💡 "Everything is a file"
    참고로 Unix 계열의 운영체제에서는 socket도 file의 일종으로 여겨집니다. 아래에서 코드로 확인하실 수 있겠지만 socket을 통해 데이터를 읽거나 쓰는 행위도 File Descriptor를 사용합니다. 파일, 소켓, 파이프, 터미널 등 모든 입출력 장치는 '입출력 수행을 위한 인터페이스'인 File Descriptor를 통해 조작됩니다.

     

     

      저의 채팅 프로그램에서는 데이터 전송 프로토콜로 TCP를 사용하고 있습니다. TCP에서 하나의 서버가 다수의 클라이언트와 독립적인 통신 채널을 유지하는 방법은 다음과 같습니다. 서버는 아래 그림과 같이 클라이언트와의 연결 수립용 소켓을 가지고 있습니다. 그러다가 클라이언트가 연결 수립용 소켓으로 연결을 요청하고 연결이 수립되면 별도의 통신용 소켓을 생성합니다. 연결 수립 이후에는 통신용 소켓으로만 통신합니다. 

    TCP 서버가 다수의 클라이언트와 독립적인 통신 채널을 유지하는 방법

     

    1-2) I/O 모델 종류
      I/O모델의 종류는 작업 순서(Sync/Async) 작업 완료 대기 여부(Blocking/Non-Blocking) 구분하며, '작업 순서' '대기 여부'  가지 특성은 독립적이므로 I/O 모델에는 Syncronous-Blocking, Syncronous-NonBlocking, Asyncronous-Blocking, Asyncronous-NonBlocking 4 종류가 존재합니다.

    작업 순서에 따른 구분
       - Sync: 작업 순서를 보장합니다. user space에서 작업 완료를 판단하므로 현재 작업에 대한 응답 시점과 다음 작업을 요청하는 시점이 일치합니다. 
       - Async: 작업 순서를 보장하지 않습니다. kernel의 작업이 완료되는 순서대로 call back 형식으로 동작합니다.

    대기 여부에 따른 구분
      - Blocking: 요청한 I/O 작업에 대한 완료 시그널을 kernel로부터 받기 전까지 다른 작업을 하지 않고 기다립니다.
      - Non-Blocking: kernel에 I/O 작업을 요청한 후 기다리지 않고 다른 작업을 합니다. (polling으로 대기 중에 작업 상태 확인은 가능).

     

    I/O 모델 종류 (참고: https://developer.ibm.com/articles/l-async/)

     

      multiplexing 예측 불가능하게 인입되는 다수의 클라이언트의 요청을 '비동기적'으로(Asyncronous) 수행하며, I/O system call에 대한 kernel의 응답은 ‘Block’되므로 Asynchronous - Blocking 영역에 해당됩니다. 하지만 system call 실제적인 이벤트 처리에 있어서는 하나의 FD I/O 작업이 완료되어야 다음 FD 작업으로 넘어가기 때문에 동기적으로 동작(Syncronous)합니다. 또한 user space에서의 I/O 작업 자체는 Block되지 않으므로 I/O multiplexing무조건 Asyncronous - Blocking 모델이라고 단정지을 수는 없습니다.

     

     

    1-3) I/O multiplexing 종류

      I/O multiplexing 종류에는 select, pselect, poll, ppoll, epoll, kqueue, iocp, devpoll 있으며, FD 관리 방법  관리 주체에 따라 크게 ① user 레벨의 라이브러리와 ② kernel 레벨의 라이브러리  가지로 구분합니다. 즉, 'FD를 어떻게 감시하느냐'가 I/O multiplexing의 핵심 쟁점입니다.

     

    ① user 레벨의 라이브러리 - select, pselect, poll, ppoll

      select, pselect, poll, ppoll는 user space에서 FD를 감시합니다. 이 중에서도 가장 원초적인 방식인 select 함수로 설명하겠습니다. application에서 select함수를 호출하면 kernel로 context switch가 일어나면서 kernel에서 select 함수가 실행됩니다. 이때 select 함수에는 user space 메모리에 저장되어 있던 '감시 대상 FD 목록'과 'timeout' 등의 인자가 전달됩니다.

     

      kernel에서는 해당 FD들을 모니터링하며, 이벤트가 발생하거나 timeout이 초과하면 blocking을 해제하고 각 FD의 이벤트 발생 여부 bit값이 저장된 fd_set을 user space로 넘겨줍니다. user space에서는 fd_set 전체를 순환하며 이벤트가 발생하여 bit 값이 1 FD 찾아냅니다. 이와 같이 select 함수가 이벤트 처리에 소요하는 시간은 감시 대상 FD 수에 비례하여 선형으로 증가하므로 시간복잡도는 O(n) 입니다.

     

    system call로 select를 사용한 Asynchronous - Blocking 모델의 데이터 처리방식 (참고: https://developer.ibm.com/articles/l-async/)

    - select: 이벤트 발생 시마다 관리하는 모든 FD를 검사하는 원초적 방식, 최대로 관리 가능한 FD의 개수는 1024개
    - pselect: select의 timeout 정밀도와 signal 처리 로직이 개선된 버전
    - poll: 이벤트 발생한 FD의 개수만큼만 검사, select는 하나의 이벤트 처리에 3bit를 사용하는 반면 poll은 64bit를 사용하므로 FD 수가 많아질수록 select보다 비효율적, 관리 가능한 FD 개수는 무제한
    - ppoll: poll의 timeout 정밀도와 signal 처리 로직이 개선된 버전

     

     

    ② kernel 레벨의 라이브러리 - epoll, kqueue, iocp, devpoll

      epoll, kqueue, iocp, devpoll은 I/O 성능 문제를 개선하기 위해 FD 상태를 kernel space에서 관리합니다. kernel 레벨의 라이브러리는 kernel space에서 바로 이벤트가 발생한 FD 리스트를 user space에 넘겨주므로 이벤트 발생 FD를 찾기 위해 fd_set을 순환하는 과정이 없습니다. 따라서 이벤트 처리를 위한 시간복잡도가 O(1) 줄어듭니다. 운영체제에서 상태 변경을 감지하기 때문에 아래와 같이 운영체제별로 사용할  있는 라이브러리가 정해져 있습니다. 저는 Mac OS 유저이므로 kqueue를 사용했습니다. 

    - epoll(Linux)
    - kqueue(OpenBSD)
    - iocp(I/O completion port)(Windows)
    - devpoll(Solaris)

     

     

      참고로 I/O system call 함수에 전달되는 timeout 인자값에 따른 blocking 상태는 다음과 같습니다.

     - timeout 인자가 생략될 경우 kernel의 응답은 이벤트가 발생할 때까지 block됩니다.

     - timeout 값이 0이면 절대 block되지 않습니다.

     - timeout 값이 0이 아닌 양수이면 이벤트가 발생하지 않더라도 해당 시간이 지나면 blocking이 해제됩니다. 

     

     

    2. 코드로 확인하는 select vs kqueue

      select와 kqueue의 비교를 위해 이벤트 처리 부분을 집중적으로 살펴보겠습니다.

     

    2-1) select

    아래는 파이썬으로 select를 구현한 예제입니다. 파이썬의 select 함수는 이벤트가 발생한 FD의 리스트를 반환합니다. 하지만 C언어로 구현된 select 함수 원형을 살펴보면 fd_set에서 bit값이 1인 필드의 '개수'를 반환합니다. 파이썬에서도 이벤트 발생 FD를 찾기 위한 내부 구현이 숨겨져있을 뿐 이벤트 처리에 대한 시간복잡도는 O(n)입니다. 

    import socket
    import select
    
    # 서버 소켓 생성
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(("127.0.0.1", 9000))
    server_socket.listen(5)
    
    sockets = [server_socket]
    
    while True:
        # I/O 이벤트를 모니터링하고 발생한 이벤트를 반환
        read_sockets, _, _ = select.select(sockets, [], [], 5)
    
        for sock in read_sockets:
            # 새로운 클라이언트 연결
            if sock == server_socket:
                client_socket, addr = server_socket.accept()
                sockets.append(client_socket)
                print(f"New connection from {addr}")
            # 기존 클라이언트로부터 데이터 수신
            else:
                data = sock.recv(1024)
                if data:
                    print(f"Received data: {data.decode()}")
                else:
                    # 클라이언트 연결이 종료된 경우
                    print(f"Connection closed")
                    sock.close()
                    clients.remove(sock)

     

      아래는 C언어의 select 함수 원형입니다.

    #include <sys/select.h>
    
    int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

     - nfdf: FD 갯수. 설정 시 0~(nfds-1) 만큼의 배열을 검사함
     - readfds: 해당 값이 null이 아니면 read 이벤트
     - writefds: 해당 값이 null이 아니면 write 이벤트
     - errorfds: 해당 값이 null이 아니면 error
     - timeout: null인경우 절대 block하지 않음, 
                         0인 경우 이벤트 발생할 때까지 무한 대기, 
                         0이 아닌 양수인 경우 해당 시간만큼만 block
     - return: 1 이상인 경우 이벤트가 발생한 FD의 갯수
                       0인 경우 timeout 발생
                       -1인 경우 error 발생

     

     

    2-2) kqueue

      다음으로는 kqueue 함수를 사용한 예제입니다. kevent()로 이벤트 필터와 이벤트를 추가한 소켓을 kqueue()로 queue에 등록해놓으면 I/O 이벤트가 발생할 때마다 kernel로부터 이벤트가 발생한 fd_set을 받는 구조입니다. 다시 말해 kernel에서 kqueue()로 할당받은 queue를 kevent()로 관리하며, user space에서 이벤트 발생 FD를 찾기 위한 추가 작업이 필요하지 않아 시간 복잡도가 O(1)입니다.

     

      참고로 아래 예제에서는 kqueue 객체를 얻기 위해 저수준 select 모듈의 kqueue 함수를 사용했으나 채팅 프로그램의 실제 구현은 고수준 selectors 모듈의 KqueueSelector 클래스를 사용했습니다. selectors 모듈을 사용하면 소켓의 FD와 주소를 직접 관리할 필요가 없습니다. 전체 코드는 저의 github에서 확인하실 수 있습니다.

    https://github.com/Yoojin-An/chatting-room

    import socket
    import select
    
    # 서버 소켓 생성
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(("127.0.0.1", 9000))
    server_socket.listen(5)
    
    # kqueue 객체 생성
    kq = select.kqueue()
    
    # 서버 소켓에 이벤트 필터 및 이벤트 추가
    server_event = select.kevent(server_socket.fileno(), filter=select.KQ_FILTER_READ, flags=select.KQ_EV_ADD)
    kq.control([server_event], 0)
    
    # 소켓 관리 딕셔너리
    sockets = {server_socket.fileno(): server_socket}
    
    while True:
        # I/O 이벤트를 모니터링하고 발생한 이벤트를 반환
        events = kq.control(None, 10, None)
    
        for event in events:
            # 새로운 클라이언트 연결
            if event.ident == server_socket.fileno():
                client_socket, addr = server_socket.accept()
                sockets[client_socket.fileno()] = client_socket
                print(f"New connection from {addr}")
                # 클라이언트 소켓에 이벤트 필터 및 이벤트 추가
                client_event = select.kevent(client_socket.fileno(), filter=select.KQ_FILTER_READ, flags=select.KQ_EV_ADD)
                kq.control([client_event], 0)
            # 기존 클라이언트로부터 데이터 수신
            elif event.filter == select.KQ_FILTER_READ:
                client_socket = sockets[event.ident]
                data = client_socket.recv(1024)
                if data:
                    print(f"Received data: {data.decode()}")
                else:
                    # 클라이언트 연결이 종료된 경우
                    print(f"Connection closed")
                    client_socket.close()
                    # kqueue에서 이벤트 제거 및 클라이언트 리스트에서 삭제
                    kq.control([select.kevent(event.ident, filter=select.KQ_FILTER_READ, flags=select.KQ_EV_DELETE)], 0)
                    del clients[event.ident]

     

    3. trouble shooting: 표준입력은 READ_EVENT이다!

      I/O multiplexing systemcall을 kqueue로 고도화하면서 알게 된 점을 정리하며 마무리하겠습니다. 클라이언트는 1) sys.stdin과 2) 서버와 통신하기 위한 자신의 소켓 두 가지 FD를 감시해야 합니다. 자신이 입력한 메세지를 감시하거나 서버로 부터 수신한 다른 클라이언트가 입력한 메세지를 감시하는 것이죠. kqueue로 원하는 FD를 감시하기 위해서는selectors.KqueueSelector.regitster() 에 'FD', '해당 FD의 이벤트 유형', '해당 이벤트 발생 시 callback되는 함수'를 파라미터로 넘겨 등록하면 됩니다.

     

      그런데 클라이언트의 표준입력을 '클라이언트가 메세지를 WRITE해야 표준입력이 일어난다'고 생각해서 WRITE_EVENT로 착각했습니다. 따라서 아래와 같이 sys.stdin FD를 WRITE_EVENT에 대해서 감시하고, WRITE_EVENT가 발생할 때 사용자 정의 wirte 함수가 콜백되도록 selectors.KqueueSelector() 객체에 등록했습니다.

    sel = selectors.KqueueSelector()
    sel.register(sys.stdin, selectors.EVENT_WRITE, write) # 표준 입력 FD를 kqueue selector에 WRITE 이벤트 감시 대상으로 등록

     

      하지만 selectors.KqueueSelector() 객체는 sys.stdin에 의하여 클라이언트가 메세지를 입력하는 이벤트를 READ하고 있었어야 합니다. 즉, 클라이언트가 메세지를 입력하면 해당 입력이 stdin으로 전달되어 READ_EVENT가 발생하는 것입니다. 따라서 아래와 같이 코드를 수정했습니다. 요약하면 클라이언트 측 어플리케이션에서 selectors.KqueueSelectors()에 등록하는 두 가지 FD (sys.stdin, 서버 통신용 소켓) 모두 READ 이벤트 감시 대상입니다.

    sel = selectors.KqueueSelector()
    sel.register(sys.stdin, selectors.EVENT_READ, read) # 표준 입력 FD를 kqueue selector에 READ 이벤트 감시 대상으로 등록

     

     

    참고: 

    https://man.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2

     

    kqueue(2)

    FreeBSD Manual Pages KQUEUE(2) System Calls Manual KQUEUE(2) NAME kqueue, kevent -- kernel event notification mechanism LIBRARY Standard C Library (libc, -lc) SYNOPSIS #include int kqueue(void); int kqueuex(u_int flags); int kevent(int kq, const struct kev

    man.freebsd.org


    https://incredible-larva.tistory.com/entry/IO-Multiplexing-%ED%86%BA%EC%95%84%EB%B3%B4%EA%B8%B0-1%EB%B6%80

     

    I/O Multiplexing 톺아보기 (1부)

    “Everything is a File” 이 말은 Linux/Unix에서는 socket도 하나의 파일(File), 더 정확히는 File Descriptor(FD, 파일 디스크립터)로 관리된다는 것에서 착안되었다. 이처럼 Low Level File Handling을 통해 socket 기반

    incredible-larva.tistory.com

     

    https://www.linkedin.com/pulse/user-level-vs-kernel-level-threads-comparing-thread-highlighting-n/

     

    User-level vs. Kernel-level Threads: Comparing user-level and kernel-level thread implementations, highlighting their advantages

    User-level threads and kernel-level threads are two different approaches to implementing multithreading in an operating system. Each approach has its own advantages and disadvantages, and the choice between them depends on the specific requirements and con

    www.linkedin.com

     

    https://www.youtube.com/watch?v=mb-QHxVfmcs

     

    https://www.youtube.com/watch?v=XNGfl3sfErc

     

    댓글

Designed by Tistory.