맛동산

Tornado Docs를 읽어보자 : User's guide - Coroutines 본문

파이썬/Tornado Framework

Tornado Docs를 읽어보자 : User's guide - Coroutines

오지고지리고알파고포켓몬고 2017. 12. 30. 16:07

본 글은 tornado 학습 목적으로 의역으로 작성한 글이며, 오역이 있을 수 있음을 알려드리고 사실과 다른 내용이 발견될 때 마다 수정 작업을 수행할 예정입니다. 

기술적인 부분은 기본적인 사항 파악 후에 작성하도록 하겠습니다.


원문 - http://www.tornadoweb.org/en/stable/guide/coroutines.html


Coroutines

Coroutines are the recommended way to write asynchronous code in Tornado. Coroutines use the Python yield keyword to suspend and resume execution instead of a chain of callbacks (cooperative lightweight threads as seen in frameworks like gevent are sometimes called coroutines as well, but in Tornado all coroutines use explicit context switches and are called as asynchronous functions).

Coroutines are almost as simple as synchronous code, but without the expense of a thread. They also make concurrency easier to reason about by reducing the number of places where a context switch can happen.

Example:

from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    # In Python versions prior to 3.3, returning a value from
    # a generator is not allowed and you must use
    #   raise gen.Return(response.body)
    # instead.
    return response.body


Python 3.5: async and await

Python 3.5 introduces the async and await keywords (functions using these keywords are also called “native coroutines”). Starting in Tornado 4.3, you can use them in place of most yield-based coroutines (see the following paragraphs for limitations). Simply use async def foo() in place of a function definition with the @gen.coroutine decorator, and await in place of yield. The rest of this document still uses the yield style for compatibility with older versions of Python, but async and await will run faster when they are available:

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

The await keyword is less versatile than the yield keyword. For example, in a yield-based coroutine you can yield a list of Futures, while in a native coroutine you must wrap the list intornado.gen.multi. This also eliminates the integration with concurrent.futures. You can use tornado.gen.convert_yielded to convert anything that would work with yield into a form that will work with await:

async def f():
    executor = concurrent.futures.ThreadPoolExecutor()
    await tornado.gen.convert_yielded(executor.submit(g))

While native coroutines are not visibly tied to a particular framework (i.e. they do not use a decorator like tornado.gen.coroutine or asyncio.coroutine), not all coroutines are compatible with each other. There is a coroutine runner which is selected by the first coroutine to be called, and then shared by all coroutines which are called directly with await. The Tornado coroutine runner is designed to be versatile and accept awaitable objects from any framework; other coroutine runners may be more limited (for example, the asyncio coroutine runner does not accept coroutines from other frameworks). For this reason, it is recommended to use the Tornado coroutine runner for any application which combines multiple frameworks. To call a coroutine using the Tornado runner from within a coroutine that is already using the asyncio runner, use thetornado.platform.asyncio.to_asyncio_future adapter.

How it works

A function containing yield is a generator. All generators are asynchronous; when called they return a generator object instead of running to completion. The @gen.coroutine decorator communicates with the generator via the yield expressions, and with the coroutine’s caller by returning a Future.

Here is a simplified version of the coroutine decorator’s inner loop:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

The decorator receives a Future from the generator, waits (without blocking) for that Future to complete, then “unwraps” the Future and sends the result back into the generator as the result of the yield expression. Most asynchronous code never touches the Future class directly except to immediately pass the Future returned by an asynchronous function to a yield expression.

How to call a coroutine

Coroutines do not raise exceptions in the normal way: any exception they raise will be trapped in the Future until it is yielded. This means it is important to call coroutines in the right way, or you may have errors that go unnoticed:

@gen.coroutine
def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

In nearly all cases, any function that calls a coroutine must be a coroutine itself, and use the yieldkeyword in the call. When you are overriding a method defined in a superclass, consult the documentation to see if coroutines are allowed (the documentation should say that the method “may be a coroutine” or “may return a Future”):

@gen.coroutine
def good_call():
    # yield will unwrap the Future returned by divide() and raise
    # the exception.
    yield divide(1, 0)

Sometimes you may want to “fire and forget” a coroutine without waiting for its result. In this case it is recommended to use IOLoop.spawn_callback, which makes the IOLoop responsible for the call. If it fails, the IOLoop will log a stack trace:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

Using IOLoop.spawn_callback in this way is recommended for functions using @gen.coroutine, but it is required for functions using async def (otherwise the coroutine runner will not start).

Finally, at the top level of a program, if the IOLoop is not yet running, you can start the IOLoop, run the coroutine, and then stop the IOLoop with the IOLoop.run_sync method. This is often used to start the main function of a batch-oriented program:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

Coroutine patterns

Interaction with callbacks

To interact with asynchronous code that uses callbacks instead of Future, wrap the call in a Task. This will add the callback argument for you and return a Future which you can yield:

@gen.coroutine
def call_task():
    # Note that there are no parens on some_function.
    # This will be translated by Task into
    #   some_function(other_args, callback=callback)
    yield gen.Task(some_function, other_args)

Calling blocking functions

The simplest way to call a blocking function from a coroutine is to use a ThreadPoolExecutor, which returns Futures that are compatible with coroutines:

thread_pool = ThreadPoolExecutor(4)

@gen.coroutine
def call_blocking():
    yield thread_pool.submit(blocking_func, args)

Parallelism

The coroutine decorator recognizes lists and dicts whose values are Futures, and waits for all of those Futures in parallel:

@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url)
                        for url in urls}
    # responses is a dict {url: HTTPResponse}

Interleaving

Sometimes it is useful to save a Future instead of yielding it immediately, so you can start another operation before waiting:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

This pattern is most usable with @gen.coroutine. If fetch_next_chunk() uses async def, then it must be called as fetch_future = tornado.gen.convert_yielded(self.fetch_next_chunk()) to start the background processing.

Looping

Looping is tricky with coroutines since there is no way in Python to yield on every iteration of a for or while loop and capture the result of the yield. Instead, you’ll need to separate the loop condition from accessing the results, as in this example from Motor:

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

Running in the background

PeriodicCallback is not normally used with coroutines. Instead, a coroutine can contain a while True: loop and use tornado.gen.sleep:

@gen.coroutine
def minute_loop():
    while True:
        yield do_something()
        yield gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

Sometimes a more complicated loop may be desirable. For example, the previous loop runs every 60+N seconds, where N is the running time of do_something(). To run exactly every 60 seconds, use the interleaving pattern from above:

@gen.coroutine
def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        yield do_something()  # Run while the clock is ticking.
        yield nxt             # Wait for the timer to run out.





Coroutines


Coroutine은 토네이도에서 비동기 코드를 작성하는데 추천되는 방법입니다. 코루틴은 파이썬 yield 키워드를 사용하여, 콜백 체인 대신 실행(execution)을 연기하고 재개합니다.(gervent같은 프레임워크 안에있는 경량 스레드들의 집합은 때때로 coroutine이라고 불립니다. 하지만 토네이도에서 모든 coroutine은 context switch를 사용하고, 비동기 함수로 호출됩니다.)1)


Coroutine은 거의 동기식 코드만큼 간단하지만, 스레드를 사용하지 않아도 됩니다. 또한 컨텍스트 전환이 발생할 수있는 곳의 수를 줄임으로써 동시성을 추론하기가 더 쉽습니다.


Example:

from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    # In Python versions prior to 3.3, returning a value from
    # a generator is not allowed and you must use
    #   raise gen.Return(response.body)
    # instead.
    return response.body


Python 3.5:async and await


파이썬 3.5는 async와 await 키워드를 도입했습니다.(이러한 키워드를 사용하는 함수들은 "native coroutines"이라고 불려집니다.) Tornado 4.3부터는 대부분 yield 기반 coroutine들을 사용할 수 있습니다.(제한사항은 다음 단락을 참조하세요.) 함수 정의 대신 @gen.coroutine 데코레이터 대신 async def foo()와 같이 사용할 수 있고, await으로 yield를 대신할 수 있습니다. 이 문서의 나머지 부분에서는 이전 버전의 파이썬과 호환을 위해 yield를 사용합니다. 하지만 async와 await을 사용한다면 더 빠르게 동작할 것 입니다.

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

await 키워드는 yield만큼 다양한 곳에 사용할 수 없습니다. 예를들어, yield기반 coroutine은 Futures 리스트를 yield할 수 있지만, native coroutine에서 리스트를 tornato.gen.multi로 반드시 래핑해야합니다. 이것은 또한 concurrent.futures와의 통합을 제거합니다.(?) 당신은 tornado.gen.convert_yielded을 사용하여, yield로 작동하는 것을 await으로 사용할 수 있게 무엇이든 변환할 수 있습니다.

async def f():
    executor = concurrent.futures.ThreadPoolExecutor()
    await tornado.gen.convert_yielded(executor.submit(g))

native coroutine은 특정 프레임워크와 가시적으로 연관 되보이지 않지만,(native coroutine은 tornado.gen.coroutine 또는 asyncio.coroutine같은 데코레이터를 사용하지 않습니다.) 모든 coroutine은 서로 호환되지 않습니다.(?) 첫번째 코루틴에 의해 선택되는, await에 직접적으로 호출되는 모든 coroutine에 의해 공유되는 coroutine runner가 있습니다. Tornado의 coroutine runner는 다용도로 설계되었고, 여러 프레임워크의 awaitable한 객체를 수용합니다. 다른 coroutine runner는 더 제한될 수 있습니다.(asyncio coroutine runner는 다른 프레임워크의 coroutine을 수용하지 않습니다.) 이런 이유에서, Tornado coroutine runner는 여러 프레임워크가 결합된 응용 프로그램에 사용하는 것을 추천합니다. 이미 asyncio runner를 사용중인 coroutine에서 Tornado runner를 사용하여 coroutine을 호출하려면tornado.platform.asyncio.to_asyncio_future 어댑터를 사용하세요.


How it works


yield가 있는 함수는 제네레이터입니다. 모든 제네레이터는 비동기식입니다;2) 제네레이터는 호출되면 완료될 때까지 실행하는 대신 제네레이터 객체를 반환합니다. @gen.coroutine 데코레이터는 yield 표현식과 Future를 반환하는 coroutine의 호출자를 통해 생성기와 통신합니다. 


다음은 coroutine 데코레이터의 내부 루프의 단순화 된 버전입니다.
# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

데코레이터는 제네레이터에서 Future를 받습니다. Future의 완료까지 대기하고(blocking 없이) yield 표현식의 결과로 제네레이터에 다시 전송합니다.(?) 대부분 비동기 코드는 비동기 함수에서 반환된 Future를 yield에 직접 전달하는 경우를 제외하고 절대 Future 클래스에 직접적으로 건드리지 않습니다.


How to call a coroutine


coroutine은 평범한 방법으로 예외를 발생시키지 않습니다: 모든 예외는 Future가 yield되기 전까지 발현되지 않을 것 입니다. 이것은 올바른 방법으로 coroutine을 호출하는 것이 중요하다는 것과 주목받지 못한 오류가 있을 수 있음을 의미합니다.

@gen.coroutine
def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

거의 모든 경우에, coroutine을 호출하는 모든 함수는 coroutine 자체여야합니다. 그리고 호출에 yield 키워드를 사용해야합니다. 슈퍼 클래스에 정의된 메소드를 재정의 하는 경우, 설명서를 참조하여 동시 루틴이 허용되는지 확인하세요.

@gen.coroutine
def good_call():
    # yield will unwrap the Future returned by divide() and raise
    # the exception.
    yield divide(1, 0)
때때로 결과를 기다리지 않고 "fire and forget"(실행과 망각)을 원할 수 있습니다. 이 경우에, 호출에 응답 가능한 IOLoop을 만드는 IOLoop.spawn_callback을 사용하는 것이 좋습니다. 만약 실패하면 IOLoop가 로그를 추적합니다:
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

이 방법으로 IOLoop.spawn_callback을 사용하는 것은 @ gen.coroutine을 사용하는 함수에 권장됩니다. 이것은 async def를 사용하는 함수에 필요합니다.(그렇지 않으면 coroutine runner가 시작되지 않습니다.)


마침내, 프로그램 최상위 레벨에서, IOLoop가 아직 실행되지 않는다면, 당신은 IOLoop를 시작할 수 있습니다, coroutine을 실행하고, IOLoop의 IOLoop.run_sync 함수를 사용해서 멈추세요. 이것은 종종 배치 지향 프로그램의 주요 기능을 시작하는데 사용됩니다.

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

Coroutine Patterns


콜백과의 상호작용


Future대신 콜백을 사용하는 비동기 코드와 상호작용 하려면 호출을 task에 랩핑하세요. 이렇게 하면 콜백 인자가 추가되고 yield할 수 있는 Future를 반환합니다.

@gen.coroutine
def call_task():
    # Note that there are no parens on some_function.
    # This will be translated by Task into
    #   some_function(other_args, callback=callback)
    yield gen.Task(some_function, other_args)

블로킹 함수 호출하기


coroutine에서 블로킹 함수를 호출하는 가장 간단한 방법은 coroutine과 호환되는 Futures를 반환하는 ThreadPoolExecutor입니다.

thread_pool = ThreadPoolExecutor(4)

@gen.coroutine
def call_blocking():
    yield thread_pool.submit(blocking_func, args)

병행


coroutine 데코레이터는 list와 dict를 인식하고, 모든 Futures가 병렬로 대기합니다.(?)

@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url)
                        for url in urls}
    # responses is a dict {url: HTTPResponse}

인터리빙


때때로 즉시 Future를 저장하는 것이 yielding하는 것보다 유용하기 때문에, 기다리기 전에 다른 직업을 시작할 수 있습니다.

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

이 패턴은 @gen.coroutine에서 가장 유용합니다. 이것은 백그라운드 프로세스로 실행되기 위해서 반드시 fetch_future = tornado.gen.convert_yielded(self.fetch_next_chunk())로써 호출되어져야합니다.(?)


루핑


루핑은 coroutine에서 까다롭습니다. for또는 while의 모든 반복에서 yield의 결과를 포착할 수 있는 방법이 파이썬에 없기 때문입니다. 대신, 다음 Motor 예제와 같이 결과에 접근하지 말고 루프 상태를 분리해야 합니다.

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

백그라운드에서 실행


PeriodicCallback은 일반적으로 coroutine과 함께 사용되지 않습니다. 대신에, coroutine은 while True: 루프문을 포함하고 tornado.gen.sleep을 사용합니다.

@gen.coroutine def minute_loop(): while True: yield do_something() yield gen.sleep(60) # Coroutines that loop forever are generally started with # spawn_callback(). IOLoop.current().spawn_callback(minute_loop)

때때로 더 복잡한 루프가 필요할 수도 있습니다. 예를들어, 이전 루프는 매 60+N초마다 실행됩니다. 여기서 N은 do_something()의 실행시간 입니다. 정확하게 60초마다 실행하려면, 인터리빙 패턴을 사용하세요.

@gen.coroutine
def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        yield do_something()  # Run while the clock is ticking.
        yield nxt             # Wait for the timer to run out.

1) gervent는 스레드를 사용해서 coroutine을 구현(흉내)낸 것이고 tornado는 완전히 비동기를 통해서 coroutine을 구현한 것 같습니다.

2) 제네레이터는 lazy한 실행을 수행합니다.




요약


솔직히 Coroutine에 대한 개념이 부족한 상태에, 영어 실력이 미약한 저로써는 이해가 잘 안되는 파트였습니다. 추후에 단계를 더 진행하며 보완하도록 하겠습니다.

Comments