Programming/Python

[python] asyncio로 boto3 실행시키기 (부제 : asyncio란?)

chronosa 2021. 1. 1. 14:42

boto3를 비동기적으로 사용할 수 있는 방법이 있는가에 대해 열심히 찾아보았으나, 결론적으로는 Threadpool 기반의 MultiThread로 실행해야 함을 깨닫고 asyncio와 관련된 내용을 정리해놓고자 한다. 내용은 주로 "파이썬답게 코딩하기"를 참조하였으며, 이 외에는 공식 Document를 상당부분 참조하였다.

1. 비동기 논블록 방식의 시작 - concurrent.futures

asyncio 이전에 개발된 멀티 스레딩/프로세싱을 고도화한 모듈이다. 비동기논블록 방식을 스레드와 프로세스로 구현하였다. 멀티 스레딩/프로세싱에서 데몬으로 작업을 실행시킨 것과 유사한 개념이라고 한다. 내부적으로 크게 Executor와 Future, 그리고 Module Functions이 있다.

 

1.1.1. Executor 

호출 가능한 객체(함수, 코루틴 등)을 비동기적으로 호출시켜주는 일종의 실행기. ThreadPoolExecutor와 ProcessPoolExecutor가 있다. 

 

1.1.2. Future

호출 가능한 객체의 비동기 실행을 캡슐화한다. 즉, Executor에서 submit 메서드로 비동기 함수가 등록되면 Future 객체가 반환되고 이를 통해 결과나 현재 상태를 조회할 수 있다. 

 

1.1.3. Module Functions

비동기 처리로 인해 병렬화된 작업을 동기화하기 위한 API를 제공한다. wait과 as_completed 두 가지 메서드가 존재하며 wait은 인자로 받은 시간만큼 기다렸다가 작업이 완료된 것과 안 된 것을 따로 나누어 반환하며 as_completed는 비동기로 실행 중인 함수의 집합을 받아서 하나씩 순회하며 완료되기를 기다린다.

2. asyncio

asyncio란 파이썬에서 정식으로 제공하는 비동기 논블록 I/O를 위한 모듈이다. concurrent.futures는 결국 스레드나 프로세스를 사용하기 때문에 이로 인해 발생하는 성능저하가 그대로 남아 있게 된다. 이를 보완하기 위해 나온 것이 제너레이터 기반의 코루틴을 사용하여 비동기 논블록을 구현하는 것이다. Python 3.5부터는 native 코루틴을 사용할 수 있게 되었다. asyncio에는 비동기 처리를 위한 coroutine이 있고, event loop에 코루틴으로 만든 작업(Task)를 등록하여 사용하게 된다.

 

아래는 asyncio를 이해하기 위한 기본적인 용어와 개념이다.

 

2.1. event loop

Node.js에서도 볼 수 있는 개념이다. Python Document에 따르면 아래와 같이 정의되어 있다.

 

"이벤트 루프는 모든 asyncio 응용 프로그램의 핵심입니다. 이벤트 루프는 비동기 태스크 및 콜백을 실행하고 네트워크 IO 연산을 수행하며 자식 프로세스를 실행합니다."

 

즉, event loop 안에서 코루틴으로 된 함수들이 동작하게 되며, 이러한 함수들은 I/O 연산이나 CPU를 사용하지 않을 때에는 제어권을 양보한다. (yield from 또는 await을 통해)

 

일련의 코루틴들은 event loop에 속하며, event loop의 스케쥴링 매커니즘에 따라 호출된다. 각각의 coroutine이 실행되면 사용자의 코드가 실행되고 await <coroutine>을 호출하게 되면 이벤트 루프에 제어권을 반환하게 된다.

asyncio에서는 event loop에 coroutine을 직접 등록할 수도 있고, task 객체로 만들어서 등록할 수도 있다. 

 

2.2. await

어웨이터블 에서 코루틴 의 실행을 일시 중지합니다. 오직 코루틴 함수 에서만 사용할 수 있습니다.

기존의 데코레이터 기반 코루틴에서의 yield from과 동일한 연산자로, 현재 진행 중인 코루틴의 실행을 일시 중지하고 await을 통해 반환되는 코루틴의 작업이 끝날 때까지 기다린다.

 

2.3. awaitable

Python 도큐먼트에 따르면, 어웨이터블 객체란 아래와 같다.

 

"우리는 객체가 await 표현식에서 사용될 수 있을 때 어웨이터블 객체라고 말합니다. 많은 asyncio API는 어웨이터블을 받아들이도록 설계되었습니다. 어웨이터블 객체에는 세 가지 주요 유형이 있습니다: 코루틴, 태스크  퓨처"

 

이러한 awaitable 객체는 다른 코루틴에서 기다려줄 수 있음을 뜻한다. 공식 홈페이지에 있는 아래의 예시 코드를 보면 이해를 쉽게 할 수 있다.

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

 

2.4. Future & Task

asyncio에서 Future란 작업을 관리하는 역할을 한다. Task에 비해 저수준의 API이며, 여기서의 Future는 concurrent 모듈의 Future와는 다른 object이다. 또한 Future는 비동기 연산의 최종 결과를 나타낸다. 코루틴은 결과나 예외가 설정되거나 취소될 때까지 Future 객체를 기다릴 수 있다.

Task는 Future를 감싼 것으로, 코루틴을 실행하는 Fature 류의 객체이다. asyncio는 작업을 Task 객체의 단위로 관리한다.

 

공식 홈페이지에 있는 아래의 예제가 Future와 Task를 이해하는데 도움이 될 것 같다.

# 이 예제는 Future 객체를 만들고, Future에 결과를 설정하는 비동기 Task를 만들고 예약하며, 
# Future가 결과를 얻을 때까지 기다립니다:

async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(
        set_after(fut, 1, '... world'))

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)

asyncio.run(main())

 

또한 asyncio.ensure_future 함수의 정의 일부를 보면 Task가 Future를 상속받은 것을 확실히 알 수 있다.

 

※ 단, 아래에서도 설명하겠지만 loop.run_in_executor 을 통해 concurrent.futures.ThreadPoolExecutor이나 concurrent.futures.ProcessPoolExecutor을 실행시킬 수 있다. (asyncio.Future 객체를 반환하게 할 수 있다.) 즉, 일반적인 라이브러리(boto3 등)을 concurrent를 통해 asyncio의 loop 내에서 실행시킬 수 있다. 이러한 테크닉은 웹서버를 개발할 때 사용할 수 있을 것으로 보인다

 

2.5. coroutine

기본적으로 coroutine은 특정 위치에서 실행과 정지를 반복할 수 있도록 여러 진입점과 진출점을 가지고 있는 함수이다. 즉, 반환을 여러 번 할 수 있는 함수이다. yield를 사용하여 값을 반환하거나 입력을 받는 제너레이터와 매우 유사한 개념이나, Python 3.5 부터는 coroutine은 별도의 타입으로 빠지게 되었다.

async 문법을 함수의 정의 앞에 붙여 코루틴을 정의할 수 있으며, await을 사용하여 작업이 끝날 때까지 기다릴 수 있다.

 

※ 3.5 이전에는 제너레이터 기반의 코루틴이 사용되었다. async 대신 @asyncio.coroutine의 데코레이터가 사용이 되었고, await은 yield from이 사용되었다.

※ async 문법을 사용한 코루틴을 native 코루틴이라고 하는데, Python 3.6부터는 native 코루틴에서도 yield나 yield from을 사용할 수 있도록 변경되었다고 한다. (= native coroutine으로 generator 생성 가능)

3. boto3와 같은 동기 라이브러리를 비동기적으로 실행시키는 방안

boto3와 같은 기존에 존재하던 라이브러리르 비동기적으로 실행시키기 위해서는 기본적으로 concurrent.futures를 사용하여야 하나, Future 객체를 반환하는 저수준 함수인 loop.run_in_executor를 활용하면 ThreadPool에서 asyncio의 event loop 내에서 실행을 시킬 수 있다. 아래는 타 블로거 분께서 작성한 예시 코드이다. 상세설명을 보려면 해당 블로그를 참조하도록 하자.

import math
import concurrent.futures
import asyncio

# The definition of is_prime has been cut from this file

# Wrapping corouting which waits for return from process pool.
async def get_result(executor, n):
    loop = asyncio.get_event_loop()
    prime = await loop.run_in_executor(executor, is_prime, n)
    return n, prime

# Scheduling the run in the asyncio event loop
async def main():
    prime_candidates = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
        17,
        4]

    # create the process pool
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Calling the asyncio coroutines returns futures.
        futures = [get_result(executor, n) for n in prime_candidates]

        # As futures are completed they are returned and the result can be obtained
        for i, future in enumerate(asyncio.as_completed(futures)):
            n, prime = await future
            if prime:
                print("{}: {} is prime".format(i,n))
            else:
                print("{}: {} is not prime".format(i,n))

if __name__=='__main__':
    # This creates the event loop and runs main in the loop until main returns.
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

 

아래 코드는 테스트를 위해 간단히 구현한 코드로, 모든 리전의 aws instances 정보를 가져오는 스크립트이다.

위의 코드와 컨셉은 유사하지만, 디테일에는 약간의 차이가 있다. 

from pprint import pprint
from concurrent.futures import ThreadPoolExecutor
import asyncio
import boto3

_executor = ThreadPoolExecutor(10)

async def execute(region):
    print(f"call the region {region}")

    loop = asyncio.get_running_loop()
    ec2 = boto3.client("ec2", region_name=region)
    response = await loop.run_in_executor(_executor, ec2.describe_instances)
    return response

async def main():
    ec2 = boto3.client("ec2", region_name="ap-northeast-2")
    regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']]

    task_list = [asyncio.ensure_future(execute(region)) for region in regions]
    done, pending = await asyncio.wait(task_list)

    results = [d.result() for d in done]
    return results

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(main())
    pprint(result)

※ asyncio.wait은 aws 이터러블에 있는 어웨이터블 객체를 동시에 실행하고, return_when에 의해 지정된 조건을 만족할 때까지 블록한다. (참조)

 

async용으로 개발된 boto3도 있는 것으로 보이는데, 지원되는 서비스가 많지 않아서 아직 사용은 어려울 것 같다.

4. 기타

4.1. asyncio.wait example을 모아놓은 사이트 : www.programcreek.com/python/example/82053/asyncio.wait

4.2. 거의 동일한 주제로 글을 쓰신 블로거분의 사이트, 많은 도움이 되었다. : medium.com/tysonworks/concurrency-with-boto3-41cfa300aab4

4.3. coroutine의 실행에 관해.. : stackoverflow.com/questions/32308382/execute-coroutine-from-call-soon-callback-function

※ 참고문서

[1] 파이썬 클린 코드, 마리아노 아나야 지음, 김창수 옮김

[2] 파이썬답게 코딩하기, 심경섭 지음

[3] medium.com/tysonworks/concurrency-with-boto3-41cfa300aab4

 

Concurrency with Boto3

Asyncio provides set of tools for concurrent programming in Python. In a very simple sense it does this by having an event loop execute a…

medium.com