异步 (asynchronous) 编程模式是相对于同步方式的另一种编程思路。 C10K 的问题提出后, 各种编程语言都出现了解决高并发的技术栈, 而早在 Python2 时期, Twisted、Tornado 和 Gevent 这三个库用不同的技术路径解决了高并发。[1] 其中就有用到 Python 的一些异步实现方法。 此文将对异步编程的基本思想和 Python 的实现方式进行阐述。

[目录]

  • 概念解释,同步 \ 异步 \ 阻塞 \ 非阻塞 \ 协程
  • 高并发问题解决方案:多线程与异步 IO
  • asyncio 解决的问题
  • Python 异步 IO 生态圈
  • 并发哪家强?应用:异步爬虫测试

概念解释,同步\ 异步\ 阻塞\ 非阻塞\ 协程

同步异步 (Synchornous\Asynchronous)

这一概念是针对主程序来说的,如果主程序遇到阻塞的任务时选择等待,那么这种行为就是同步的, 如果不等待选择执行其他的任务,就是异步的;

阻塞非阻塞 (Blocking\Unblocking)

这一概念是针对任务来说的, 如果某项任务在执行费时 IO 操作时不能挂起跳出(不能把执行权归还主程序)那么此项任务就是阻塞的, 相反就是非阻塞的。

其实不必将上面四个概念区分的很清楚,只需明白异步非阻塞编程的基本思路是将IO 密集型的任务时间节省出来让 CPU 尽可能多地去完成计算密集型任务

协程 (coroutines)

Python 最早是用生成器写的协程,3.4 后可以用 asyncio.coroutine 装饰一个协程,3.5 加入了新的关键字async\await协程成为新的语法。3.6 还引入了异步生成器。

协程的定义由很多,我比较认同可以挂起中断的函数这一说法,前面概念中讲到非阻塞的任务是可以挂起中断的,那么协程正可以实现此项任务;

协程包含两种情况:

  • 协程函数:async def 或者 @asyncio.coroutine
  • 协程函数所返回的对象

协程的运作方式:

  • 通过 result = await future 或者 result = yeild from future,悬挂协程,直到 future 完成,获取 future 的结果 / 异常
  • 通过 result = await coroutine 或者 result = yeild from coroutine 等待另一个协程的结果(或者异常,异常会被传播)。
  • returen expression 返回该协程的结果,被 await,或者 yield from 获取。
  • raise exception,抛出异常,被 await,或者 yield from 获取。

协程的典型应用案例,摘自《Fluent Python》

# 计算移动平均值

from collections import namedtuple
Result = namedtuple('Result', 'count average')
# 子生成器
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break
        total += term
        count += 1
        average = total/count
        return Result(count, average)

# 委派生成器
def grouper(results, key):
    while True:
        results[key] = yield from averager()

# 客户端代码,即调用方
def main(data):
    results = {}
    for key, values in data.items():
        group = grouper(results, key) 
        # 协程需要激活
        next(group)
        for value in values:
            # 传递值给自生成器
            group.send(value) 
        group.send(None) # 重要! 

    report(results)

# 输出报告
def report(results):
    for key, result in sorted(results.items()):
    group, unit = key.split(';')
    print('{:2} {:5} averaging {:.2f}{}'.format(
    result.count, group, result.average, unit))

data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
    main(data)

调用协程函数并不能使该协程运行。调用协程函数所返回的协程对象,在被你安排执行之前,不会做任何事情。有两种方式可以启动它:[2]

  • 通过在一个已经启动的协程中调用:await coroutine 或者 yield from coroutine
  • 或者通过 ensure_task () 以及 loop.create_task () 安排协程的执行。(使用 asyncio 库)

高并发问题解决方案:多线程与异步 IO

其实解决高并发就是实现的程序的异步和非阻塞,而多线程\ 多进程是大多数比较熟悉的方案,将阻塞任务挂起到多个线程, 由操作系统实现线程 \ 进程间的调度concurrent.futures对象的出现使得多线程的非阻塞任务可以使用回调函数, 参考下例:

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
from concurrent.futures import Future
from multiprocessing import Pool
from functools import partial
import time

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)

# 回调函数
def done_callback(msg, future):
    print(msg + 'this is called by callback!', future.result())

urls = [3,2,4]

all_task = [executor.submit(get_html, (url)).add_done_callback(partial(done_callback, 'XXX')) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)

executor.submit 返回一个 future 对象,这个对象可以存储结果、取消、添加回调等等。熟悉前端的朋友们肯定想起来这与 javascipt 的 promise 对象的思想一致;

还有一种常见的写法是 (使用进程与线程池,减少频繁创建线程):

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'

def fetch(a):
    r = requests.get(URL.format(a))
    return r.json()['args']['a']

def requests_threadpool():
    start = time.time()
    # 自动等待完成
    with ThreadPoolExecutor(max_workers=3) as executor:
        for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
            print('fetch({}) = {}'.format(num, result))

多进程 ProcessPoolExecutor的接口与线程基本相同;他们实现异步非阻塞的问题是:

  • 线程、进程的创建与切换由 os 完成,会消耗一定的时间和内存。
  • 需写大量的回调函数

现在希望用单线程的方式实现异步和非阻塞,Python 的协程实现了非阻塞任务,现在只需要一个任务循环机制就可以把任务的调度交由程序本身来完成,不经过操作系统调度可省下不少的时间。

asyncio 解决的问题

Python3.4 正式加入的 asyncio (读音请自行谷歌) 为单线程实现异步提供了任务循环机制,同时为协程提供了一些包装接口,使得协程不许要进过 send\next 等繁琐操作。同时引入了可等待对象 (Awaitable)的概念;

可等待对象要实现的协议有aiter, anext,aenter,aexit;asyncio 接口可以直接用的可等待对象是:

  • 协程
  • aysncio.future 对象 (底层 API)
  • Task 对象(高级 API)

future 对象对协程包装成结果容器,使其可以像多线程中的 future 一样可以使用回调等功能;Task 是 future 的子类,用于同时启动多个协程 \future

对于这三个对象不理解的可以直接看官方文档中的阐述,其实大部分情况下不必使用底层的 future 对象;

几个 asyncio 常用的接口:

  • asyncio.run (coro, *, debug=True) 3.7 这个函数可以视当前 loop 的情况启动协程
  • asyncio.create_task (coro) 3.7; 将协程加入到任务队列中去立即执行
  • awaitable asyncio.gather (*aws, loop=None, return_exceptions=False) 3.7;在队列中执行可等待对象
  • coroutine asyncio.wait (aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 3.7; 按照条件等待可等待对象的完成,是一种阻塞写法;

上面的接口都是 3.7 出现的,之前的写法中还会使用带队列 loop, 在 3.7 中致力于取消 loop 的写法,使得用户更容易上手;

再次回顾一下 asyncio 最主要解决的问题:为可等待对象提供了调度多列,使得用户可以像写同步代码一样实现异步非阻塞操作,回避了写回调函数;

Python 异步 IO 生态圈

协程中不能写阻塞的代码,所以就不能使用阻塞的任务和库,如 requests, pymysql 等, 非得使用也可以,异步 + 多线程的方式仍然有效,而且 asyncio 也跟多进程一样实现了他自己的同步机制,一定程度上保证线程安全;但是我们都希望所有的任务都可以使用异步完成,目前,aio 生态圈正在逐步完善,已经有如下项目:

Web 框架

Libraries to build web applications.

  • aiohttp - Http client/server for asyncio (PEP-3156).
  • sanic - Python 3.5+ web server that's written to go fast.
  • Quart - An asyncio web microframework with the same API as Flask.
  • Vibora - Performant web framework inspired by Flask.
  • cirrina - Opinionated asynchronous web framework based on aiohttp.
  • autobahn - WebSocket and WAMP supporting asyncio and Twisted, for clients and servers.
  • websockets - A library for building WebSocket servers and clients in Python with a focus on correctness and simplicity.
  • Tornado - Performant web framework and asynchronous networking library.
  • Japronto! - Experimental http toolkit built on top of uvloop and picohttpparser.
  • Starlette - A lightweight ASGI framework/toolkit for building high performance services.
  • uvicorn - The lightning-fast ASGI server.

消息队列

Libraries to implement applications using message queues.

  • aioamqp - AMQP implementation using asyncio.
  • aiozmq - Asyncio (pep 3156) integration with ZeroMQ.
  • crossbar - Crossbar.io is a networking platform for distributed and microservice applications.
  • asyncio-nats - Client for the NATS messaging system.
  • aiokafka - Client for Apache Kafka.

数据库驱动

Libraries to connect to databases.

  • asyncpg - Fast PostgreSQL Database Client Library for Python/asyncio.
  • asyncpgsa - Asyncpg with sqlalchemy core support.
  • aiopg - Library for accessing a PostgreSQL database.
  • aiomysql - Library for accessing a MySQL database
  • aioodbc - Library for accessing a ODBC databases.
  • motor - The async Python driver for MongoDB.
  • aioredis - aio-libs Redis client (PEP 3156).
  • asyncio-redis - Redis client for Python asyncio (PEP 3156).
  • aiocouchdb - CouchDB client built on top of aiohttp (asyncio).
  • aioinflux - InfluxDB client built on top of aiohttp.
  • aioes - Asyncio compatible driver for elasticsearch.
  • peewee-async - ORM implementation based on peewee and aiopg.
  • GINO - is a lightweight asynchronous Python ORM based on SQLAlchemy core, with asyncpg dialect.
  • Tortoise ORM - native multi-backend ORM with Django-like API and easy relations management.

网络

Libraries to communicate in your network.

  • AsyncSSH - Provides an asynchronous client and server implementation of the SSHv2 protocol.
  • aiodns - Simple DNS resolver for asyncio

测试

Libraries to test asyncio based applications.

  • aiomock - A python mock library that supports async methods.
  • asynctest - Enhance the standard unittest package with features for testing. asyncio libraries
  • pytest-asyncio - Pytest support for asyncio.
  • aresponses - Asyncio http mocking. Similar to the responses library used for requests
  • aioresponses - Helper for mock/fake web requests in Python aiohttp package.

可选择的队列

Alternative asyncio loop implementations.

  • uvloop - Ultra fast implementation of asyncio event loop on top of libuv.
  • curio - The coroutine concurrency library.

其他

Other awesome asyncio libraries.

  • aiofiles - File support for asyncio.
  • aiodebug - A tiny library for monitoring and testing asyncio programs.
  • aiorun - A run() function that handles all the usual boilerplate for startup and graceful shutdown.
  • aioserial - A drop-in replacement of pySerial .
  • aiozipkin - Distributed tracing instrumentation for asyncio with zipkin

文档

Documentation, blog posts, and other awesome writing about asyncio.

讨论

Recordings of awesome talks about asyncio.

并发哪家强?应用:异步爬虫测试

对着 asyncio 的文档示例多理解几个程序后, 我们就可以写几个 IO 密集操作的程序来测试与喜爱各种方案的效率了,我编写了一个简单的爬虫,没有写储存的 IO,只测试请求的 IO。分别用了以下几个方案:

  • requests + ThreadPool
  • requests + ProcessPool
  • asyncio + requests + ThreadPool
  • asyncio + requests + ProcessPool
  • asyncio + aiohttp +SingleThread
  • asyncio + Multiporcess (这个我直接选用了库 aiomultiporcess )

还有一个国内开发者基于 asyncio 和 aiohttp 写的一个 scrapy-like 的爬虫框架 ruia , 速度也不错;

完整的代码 在这里

最终结果:

可以看出在 requests 库 + 多线程上面套用异步是无效的,而 aiohttp 的多线程并没有效率上的明显提升。

参考资料