ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [python] RuntimeError(There is no current event loop in thread 'ThreadPoolExecutor-0_0') 해결 - ensure_future()와 create_task()의 차이점
    Trouble Shooting/Structural Error 2023. 5. 7. 11:58

     저는 파이썬을 사용하는 서버 개발자로서 회사에서 주로 asyncio를 활용한 비동기 프로그래밍을 하고 있습니다. 저번주 업무는 여러 개의 oid에 대해서 snmp walk를 비동기로 수행하는 엔진을 만드는 것이었는데 작업 중 예상치 못한 에러를 만났습니다. 이것저것 시도해보다 에러는 잡았는데 이유가 이해되지 않아 거의 1주일을 고민했습니다. context switching, thread, coroutine등의 개념을 알고 나서야 어느 정도 이해가 되어 그 과정을 공유하고자 합니다.

     

     에러가 발생한 부분은 async def로 정의한 코루틴 함수(table_task)를 asyncio.ensure_future()의 인자로 전달해 task로 반환시켜 task list(vendor_tasks)에 추가하는 코드였고, 해당 코드에서 RuntimeError가 발생했습니다.

    vendor_tasks.append(asyncio.ensure_future(self.table_task(host, table_nm, oid_list))) # 에러 발생
    
    loop.run_until_complete(*vendor_tasks)

     

    RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

     

    에러 캡처 화면

     

     엔진의 구동은 다음과 같이 이루어집니다. real.py, main.py, manager.py 세 가지의 파이썬 스크립트 파일이 있습니다. main.py를 통해 manager.run 함수를 호출해 다수의 oid에 대해 snmp walk를 비동기로 수행하는 것이 메인 로직입니다.

     

     manager.run함수를 호출하는 방식은 두 가지 입니다. 첫 번째는 main.py를 daemon으로 띄워놓고 웹에서 manager.run()를 실행시키기 위한 메세지를 Publish 하면 real.py가 REDIS Pub/Sub 기능을 사용해 Subscribe해서 main.py를 자동으로 실행시킵니다. 두 번째 방식은 command line에서 수동으로 main.py를 실행시키는 것입니다. 이상한 점은 수동 실행 시에는 에러가 발생하지 않았는데 real.py를 통해 자동으로 main.py를 실행할 때만 위의 RuntimeError가 발생하는 것이었습니다.

     

    main을 실행시키는 두 가지 방법과 manager의 역할

     

     핵심만 남긴 manger.py의 소스코드는 아래와 같으며, 코루틴을 Task 객체로 만들기 위한 함수로 asyncio.ensure_future()를 사용하고 있습니다.

     

    # manager.py
    
    class L4Chgmgr:
        async def collect(oid_list):
            tasks = [asyncio.ensure_future(oid.snmp_walk()) for oid in oid_list]
            await asyncio.gather(*tasks)
    
    class Manager:
        def __init__(self):
            self.loop = asyncio.get_event_loop()
            self.targets = get_job_list()
        
        async def table_task(host, table_nm, oid_list):
            await L4Chgmgr.collect(oid_list)
      
        def _run(self):
            vendor_tasks = []
            for target in self.targets:
                # 에러 발생
                vendor_tasks.append(asyncio.ensure_future(self.table_task(host, table_nm, oid_list))
            
            self.loop.run_until_complete(asyncio.gather(*vendor_tasks))
        
        def run(self):
            with threading.Lock():
               self._run()

     

     한편 awaitable한 Task를 만들 수 있는 함수는 고수준 asyncio. create_task(), 저수준 loop.create_task(), 저수준 asyncio.ensure_future()가 있는데 asyncio 공식 문서와 아래 서적(파이썬 비동기 라이브러리 Asyncio)을 참조한 결과 코루틴임이 확실한 인스턴스를 스케줄링하여 새로운 Task로 만들고 싶은 경우에는 create_task()가 적합하다는 정보를 얻을 수 있었습니다.

     

    파이썬 asyncio 공식문서 - 새로운 Task를 만들 때 create_task()가 선호됨

     

    코루틴임이 확실한 인스턴스에 대해 스케줄링하고 싶은 경우에는 create_task()가 가장 적합한 API이다. ensure_future()를 호출해야 하는 유일한 경우는 코루틴 혹은 Future일 수 있는 인스턴스를 받아서 그 인스턴스에 Future로만 할 수 있는 동작을 수행해야하는 경우뿐이다.
     - 귀도 반 로섬, https://oreil.ly/cSOFB 이슈 #477의 댓글

    출처: 파이썬 비동기 라이브러리 Asyncio(2021) p. 64

     

     따라서 ensure_future()를 create_task()로 변경하기로 했고 고수준과 저수준의 정확한 차이를 잘 모르겠어서 asyncio.create_task()로도 바꿔보고, loop.create_task()로도 바꿔본 결과 loop.create_task()로 수정했을 때 에러가 발생하지 않았습니다. 운으로 오류는 잡았으나,, 여전히 왜 해결됐는지는 이해가 가지 않았습니다.

     

     참고로 asyncio 공식 문서에 따르면 ensure_future()는 인자가 코루틴이라면 Future 객체로 감싼 후 Task 객체로 반환하고 인자가 Task나 Future라면 그대로 Task나 Future를 반환한다고 되어있습니다. 또, gather()는 내부적으로 ensure_future()를 사용합니다. 그래서 혹시.. table_task()가 gather()를 사용하는 L4Chgmgr.collect()를 await하고 있고 table_task()가 코루틴이 아니기 때문에 문제가 생긴 것이 아닐까 하는 생각에 table_task의 타입을 확인해봤습니다.

     

    print("Is table_task future?", asyncio.isfuture(table_task()))
    print("Is table_task coroutine?", asyncio.iscoroutine(table_task())
    print("Is table_task coroutine funtion?", asyncio.iscoroutinefuction(table_task())

     

    위 table_task의 타입을 출력하는 코드 실행 결과 (삽질 no.111231311)

     

     음.. 코루틴이군 하며 에러 메세지를 다시 보니 "RuntimeWarning: coroutine 'Manager.table_task' was never awaited"라고 명시되어 있었습니다. 저의 독해 실력에 한번 더 머리가 아파왔습니다. '코루틴'인 Manager.table_task가 await 되지 않았다... 🥲 다시 await되지 않았다는 에러는 언제 발생하는지 구글링을 해보니 await 키워드를 사용하지 않으면 해당 에러가 발생한다고 되어있었습니다. 즉, 코루틴 table_task를 await로 실행하지 않은 것입니다. 따라서 await를 사용하는 코드로 변경해보기로 했습니다.

     

    # 변경 전
    for target in self.targets:
        vendor_tasks.append(asyncio.ensure_future(self.table_task(host, table_nm, oid_list))
        
    # 변경 후
    await asyncio.gather(*[self.table_task(target[host], target[table_nm], target[oid_list]) for target in self.targets])

     

     await 키워드를 사용하여 비동기로 수행되는 다른 작업의 완료를 기다리는 동안 코드의 실행을 Non-Blocking하는 구조는 async def로 정의된 네이티브 코루틴 함수 내에서만 가능합니다. 하지만 table_task()를 실행시키는 _run()은 일반 함수였습니다. await 키워드를 사용하기 위해 _run()을 네이티브 코루틴 함수로 만든다 하더라도 타 엔진들도 real.py에서 run을 호출하기 때문에 run 함수를 수정할 수 없어 이 방법은 포기했습니다.

     

     또 다른 방법을 찾아나섰습니다. 일단 ensure_future()가 아닌 create_task()를 사용할 때 에러가 발생하지 않았다는 것은 알았기 때문에 ensure_future함수와 create_task함수가 asyncio 라이브러리에 어떻게 작성되어있는지 확인해보기로 했습니다. 눈에 띄는 차이점으로는 loop 객체를 얻기 위해 사용하는 메서드가 다르다는 점을 발견했습니다. ensure_future()는 get_event_loop()를 사용하고 create_task()는 get_running_loop()를 사용합니다.

     

    # asyncio/tasks.py
    
    def ensure_future(coro_or_future, *, loop=None):
        """Wrap a coroutine or an awaitable in a future.
        If the argument is a Future, it is returned directly.
        """
        if futures.isfuture(coro_or_future):
            if loop is not None and loop is not futures._get_loop(coro_or_future):
                raise ValueError('The future belongs to a different loop than '
                                'the one specified as the loop argument')
            return coro_or_future
        should_close = True
        if not coroutines.iscoroutine(coro_or_future):
            if inspect.isawaitable(coro_or_future):
                async def _wrap_awaitable(awaitable):
                    return await awaitable
    
                coro_or_future = _wrap_awaitable(coro_or_future)
                should_close = False
            else:
                raise TypeError('An asyncio.Future, a coroutine or an awaitable '
                                'is required')
    
        if loop is None:
            loop = events.get_event_loop()
        try:
            return loop.create_task(coro_or_future)
        except RuntimeError:
            if should_close:
                coro_or_future.close()
            raise
    
    def create_task(coro, *, name=None, context=None):
        """Schedule the execution of a coroutine object in a spawn task.
        Return a Task object.
        """
        loop = events.get_running_loop()
        if context is None:
            # Use legacy API if context is not needed
            task = loop.create_task(coro)
        else:
            task = loop.create_task(coro, context=context)
    
        _set_task_name(task, name)
        return task

     

      아래는 get_event_loop함수와 get_running_loop함수의 소스코드입니다. get_event_loop()는 '현재 context에 존재하는' 이벤트루프를 반환하고 get_running_loop()는 '현재 OS thread'에서 실행 중인 이벤트 루프를 반환합니다. 따라서 ensure_future()를 사용했을 때 발생한 에러(RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.)는 '현재 context에 존재하는 이벤트루프'를 가져올 수 없어 발생한 것이었음을 확인할 수 있었습니다. 그리고 create_task()는 manager init 시점에서 asyncio.get_running_loop()로 얻은 '현재 OS thread에서 실행 중인 루프'를 가져오기 때문에 에러가 발생하지 않은 것이라는 결론을 얻게 되었습니다.

     

    # asyncio/events.py
    
    def get_event_loop(self):
        """Get the event loop for the current context.
        Returns an instance of EventLoop or raises an exception.
        """
        if (self._local._loop is None and
                not self._local._set_called and
                threading.current_thread() is threading.main_thread()):
            stacklevel = 2
            try:
                f = sys._getframe(1)
            except AttributeError:
                pass
            else:
                # Move up the call stack so that the warning is attached
                # to the line outside asyncio itself.
                while f:
                    module = f.f_globals.get('__name__')
                    if not (module == 'asyncio' or module.startswith('asyncio.')):
                        break
                    f = f.f_back
                    stacklevel += 1
            import warnings
            warnings.warn('There is no current event loop',
                          DeprecationWarning, stacklevel=stacklevel)
            self.set_event_loop(self.new_event_loop())
    
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
    
        return self._local._loop
    
    def _get_running_loop():
        """Return the running event loop or None.
        This is a low-level function intended to be used by event loops.
        This function is thread-specific.
        """
        # NOTE: this function is implemented in C (see _asynciomodule.c)
        running_loop, pid = _running_loop.loop_pid
        if running_loop is not None and pid == os.getpid():
            return running_loop
    
    def get_running_loop():
        """Return the running event loop.  Raise a RuntimeError if there is none.
        This function is thread-specific.
        """
        # NOTE: this function is implemented in C (see _asynciomodule.c)
        loop = _get_running_loop()
        if loop is None:
            raise RuntimeError('no running event loop')
        return loop

     

     다시 서적을 들춰보며 이벤트 루프에 대해서 찾아보니 이전에는 보이지 않던 내용이 바로 보입니다. async def로 정의된 코루틴함수 내부에서 루프를 호출할 때는 asyncio.get_running_loop()를 사용해야한다고 합니다. asyncio 공식 문서에서도 get_event_loop()보다 get_running_loop()가 선호된다고 명시되어있습니다. 이제야 왜 Task를 만들 때 ensure_future()보다 get_running_loop()로 루프 객체를 얻는 create_task()가 선호되는지도 이해가 됩니다.

     

    asyncio.get_event_loop()는 코루틴을 실행하기 위해서 필요한 루프 인스턴스를 얻는 문법이다. 동일 스레드에서 호출하면 코드의 어느 부분에서 get_event_loop()를 호출하든 매번 똑같은 루프 인스턴스를 반환한다. 그런데 async def 함수 내에서 호출하는 경우에는 asyncio.get_running_loop()를 호출해야 한다.

    출처: 파이썬 비동기 라이브러리 Asyncio(2021) p. 41

     

    파이썬 asyncio 공식문서 - 이벤트 루프를 가져올 때 get_event_loop()보다 get_running_loop()가 선호됨

     

     마지막으로 get_event_loop()에서 등장했던 context에 대한 개념을 알아보고자 합니다. context란 사용자와 사용자, 사용자와 시스템, 사용자와 디바이스간의 상호작용에 영향을 미치는 레지스터 값을 말합니다.

     

     멀티프로세스 환경에서 CPU가 하나의 프로세스를 실행하고 있는 상태에서 인터럽트 요청에 의해 다음 우선순위의 프로세스를 실행시킬 때, 동작 중인 프로세스가 해당 프로세스의 context와 상태 등을 보관하고 대기하고 있던 다음 순번 프로세스의 context와 상태 등을 적재하여 이전에 보관했던 프로세스 context를 복구하게 됩니다. 이렇게 CPU가 다음 프로세스를 수행하도록 새로운 프로세스의 context로 교체하는 작업을 context switching이라고 합니다.

     

     참고로 인터럽트 요청의 종류에는 I/O request(입출력 요청), time slice expired(CPU 사용 시간 만료), fork a child(자식 프로세스 생성), wait for an interrupt(인터럽트 처리 대기) 등이 있습니다.

     

     프로세스의 상태는 해당 프로세스 제어 블록(Process Control Block, PCB)에 기록됩니다. PCB란 프로세스를 관리하는데 있어 필요한 정보를 담고 있는 운영체제 커널의 자료구조를 말하는데, 프로세스가 생성될 때마다 고유의 PCB가 생성되며 프로세스가 끝나면 PCB는 제거됩니다. PCB에 저장하는 정보들에는 프로세스 상태, 프로세스 번호, 프로세스 카운터, 레지스터(context) 등이 있습니다.

     

     이 시점에서 다시 '파이썬 비동기 라이브러리 Asyncio'에서 context switching에 대해서 찾아봤습니다. context switching은 'await' 키워드로 제어할 수 있다고합니다.

     

    멀티태스킹을 적절히, 협력적으로 구성하기 위해서는 I/O 위주 함수들을 적당히 어우러지게 사용해야 한다. 이는 await 키워드를 사용하여 루프로 콘텍스트 전환을 제어한다는 의미다.

    출처: 파이썬 비동기 라이브러리 Asyncio(2021) p. 42

     

    서로 다른 인스턴스 간에 콘텍스트 전환을 할 수 있는 유일한 위치는 바로 await 키워드가 있는 곳이다. 이 함수의 나머지 부분에서는 콘텍스트 전환이 일어날 수 없다.

    출처: 파이썬 비동기 라이브러리 Asyncio(2021) p. 165

     

     

     정리해보면

    real.py를 통해 main.py를 실행시킬 때 context 전환이 일어났고, get_event_loop()로 루프 객체를 얻는 ensure_future()를 사용했던 기존 코드에서는 table_task() 실행 시 main.py에서 real.py로 context switching이 일어났고, main.py에서 real.py로 넘어간 루프를 찾을 수 없어 RuntimeError가 발생했습니다. 수동으로 main.py를 직접 실행시킬 때는 context switching이 일어나지 않아 에러가 발생하지 않았습니다.

     

    real.py가 main.py를 실행시킬 때 context switching이 발생한다.

     

     결론적으로 await 키워드를 사용하는 등 구조적 변경 없이  ensure_future()를 get_running_loop()로 루프 객체를 얻는 create_task()로 변경함으로써 새로운 이벤트 루프를 얻어 RuntimeError는 해결하였으나 thread를 하나 더 생성하는 꼴과 같아졌으므로 asyncio를 사용하면서도 threading의 단점을 가져가는 구조라는 점은 개선할 부분으로 남았습니다.

     

    # 변경 전
    for target in self.targets:
        vendor_tasks.append(asyncio.ensure_future(self.table_task(host, table_nm, oid_list))
        
    # 변경 후
    for target in self.targets:
        vendor_tasks.append(self.loop.create_task(self.table_task(host, table_nm, oid_list))

     

     + 추가로 발견한 사실,, 공식 문서에서 never-awaited 코루틴으로 인한 RuntimeWarning 해결법으로 await 키워드를 사용하거나 asynio.create_task()를 사용할 것을 제시하고 있습니다. 

     

    never-awaited coroutine을 await 하는 방법

     

    참고:

    • "Using Asycio in Python - 대규모 병행성 작업을 위한 비동기 프로그래밍 (2021)"

     

    • asyncio 공식 문서

    https://docs.python.org/3/library/asyncio-future.html?highlight=asyncio%20ensure_future#asyncio.ensure_future 

     

    Futures

    Source code: Lib/asyncio/futures.py, Lib/asyncio/base_futures.py Future objects are used to bridge low-level callback-based code with high-level async/await code. Future Functions: Future Object: T...

    docs.python.org

     

    • asyncio 소스코드

    https://github.com/python/cpython/blob/e5bd5ad70d9e549eeb80aadb4f3ccb0f2f23266d/Lib/asyncio/events.py

     

    GitHub - python/cpython: The Python programming language

    The Python programming language. Contribute to python/cpython development by creating an account on GitHub.

    github.com

     

    • RuntimeWarning관련

    https://tech.buzzvil.com/blog/asyncio-no-1-coroutine-and-eventloop/

     

    asyncio 뽀개기 1 - Coroutine과 Eventloop

    이 시리즈의 목적은 asyncio의 컴포넌트들과 활용법을 소개하는 것입니다. 최종적으로는 실제 production에 쓰이고 있는 graceful shutdown을 구현하는 것을 목표로 하며, 그 과정에서 필요한 asyncio 지식

    tech.buzzvil.com

     

    • context switching

    https://velog.io/@wejaan/%EC%9A%B4%EC%98%81%EC%B2%B4%EC%A0%9C-Context-Switching-Process-Control-Block

     

    [운영체제] Context Switch

    컨텍스트 스위칭은 CPU에 실행할 프로세스를 교체하는 기술이다. 프로세스 스케줄링에서 프로세스를 계속해서 바꿔주는 기술들에 사용되는 게 바로 컨텍스트 스위칭이다. 프로세스는 다음에 실

    velog.io

    https://superfastpython.com/context-switch-interval-in-python/

     

    Context Switch Interval In Python

    You can change how often Python threads may context switch via the sys.setswitchinterval() function. In this tutorial you will discover how to retrieve and change the switch interval in Python. Let…

    superfastpython.com

    https://jeong-pro.tistory.com/93

     

    OS - Context Switch(컨텍스트 스위치)가 무엇인가?

    Context Switching이 무엇인가? 멀티프로세스 환경에서 CPU가 어떤 하나의 프로세스를 실행하고 있는 상태에서 인터럽트 요청에 의해 다음 우선 순위의 프로세스가 실행되어야 할 때 기존의 프로세스

    jeong-pro.tistory.com

    https://en.wikipedia.org/wiki/Context_switch

     

    Context switch - Wikipedia

    From Wikipedia, the free encyclopedia Switch between processes or tasks on a computer This article is about computer task switching. For the term in human cognition, see Human multitasking. In computing, a context switch is the process of storing the state

    en.wikipedia.org

    댓글

Designed by Tistory.