Preface

2019 年差不多也是这个时候,我写了 一篇文章 ) 概述 python 语言本身的异步机制。随着认知的增长,重读文章和当时为了说明问题的实验代码,我发现了很多理解不到位甚至是错误的地方。故写此文一方面来勘误,另一方面也扩展一些东西。去年 (2019 年) 我接触了 Go 语言 (Go Lang),同时在实习的项目中多次使用了 python 多线程实现异步,对于异步常见场景的使用和解决方案都有了一些直观的认识。

故作此文,比较常见的异步案例来讲述 程序语言 级别的异步实现。主要比较 python 的 asyncio 方案和 go 语言的 goroutine。两者实现的原理不同、使用范式也不太一样。但共同点是他们是从编程语言层面实现了异步,i.e. 没有使操作系统来完成子程序的上下文切换和线程切换。本文不着重探究两种语言方案的实现原理,主要探索如何在对应场景下使用。相信读完本文后读者大致会对 asynciogoroutine 的编程风格有一大致了解。

程序级别异步概述

首先还是要简述下什么是异步,以及异步重要的原因。

异步是解决阻塞 (Block)程序的一种方式,因此先介绍下什么样的程序是阻塞的。

阻塞程序一般会造成 CPU 空闲,程序停止等待 IO 操作。常见的 IO 操作有:数据库连接、数据库增删改查、文件的打开与读取、http 连接与请求 (底层表现为 socket 连接与数据接收);常见的场景如爬虫中 Web 数据流的接收 (相信这个场景是大多数人尝试写过并且熟悉的),你有很多页面 (任务) 需要爬取,而 http 只能建立对每个页面建立一次连接然后传输数据。阻塞的程序会线性地执行每一个页面的 "数据接收 -- 处理" 操作,而数据接收阶段是不占用 CPU 核心的,这就导致了算力的浪费。

非阻塞程序不会因为某一操作能停止执行 (CPU 空闲),而会保证 CPU 核心始终在工作状态且符合我们的逻辑需求。上面爬虫的例子中,如果在等待某一页面数据接收的同时,程序能执行其他页面的处理操作,这就是非阻塞程序。我们把使得程序非阻塞的方式称为异步 (asynchrounous).

实现异步最简单直接的方式即通过多线程、多进程 + 回调函数。即将能够阻塞程序的耗时的 IO 操作挂起到一个 subprocess\subthread, 而主线程\ 进程执行主要的逻辑。这就需要操作系统来完成线程的切换和子例程上下文的切换。

而程序语言级别实现的异步是指异步任务的切换由操作语言完成,且使用单线程或较少的线程。这种能随时停止并重新激活执行的子例程 (或者干脆理解为子函数) 称为协程 (coroutine)。协程使得程序级的异步实现成为了可能。

下面我用python3.7中的简单例子做一说明

async def task_A():
    await asyncio.sleep(1)
    print("task_A finished!")

async def task_B():
    await asyncio.sleep(1)
    print("task_B finished!")

asyncio.gather(*[task_A(), task_B()])

上述的代码会同时执行两个协程任务,看起来就像是开了多线程一样,但他们的确是在一个线程中执行。要说明的就是await关键字,他只能出现在协程中,他表示当前协程即将要等待,可以让出 CPU 了,此时 asyncio 维护的事件循环会执行其他协程,每个 await 后的任务完成后,事件循环会感知,协程会在此处恢复继续执行。由此要说明的是仅仅依靠async\await只能实现协程,不一定能实现并行。

关于 asyncio 的底层原理,有一篇文章[4] 写的很精彩,大家可以详细阅读,结论就是 asyncio 使用了操作系统自身的 epoll 机制维护事件循环。

await表示接下来等待的就是阻塞的任务,任务完成后 await 后续的程序才会接着执行。asyncio充当事件循环的角色。如果这两个函数是这样的:

import time
def task_A():
    time.sleep(1)
    print("task_A finished!")

 def task_B():
    time.sleep(1)
    print("task_B finished!")

则除了使用多线程没办法将两个任务并行。

目前大家应该能理清楚阻塞、异步、协程的关系。其实搞不清楚也没太大关系,我们主要掌握的技能是识别需要异步的场景,并选用合适的技术解决他。正如上文所说,多线程是最直观也是大多数人都掌握的方案,也是目前工业界普遍使用的方案,如果异步的需求不多,多线程足以满足需要。但程序级别的实现是有性能优势的,下文组要介绍 python 语言和 go 语言是如何实现的。

asyncio 与 goroutine 实现场景

场景简述

我设置了一个在后端 RestAPI 很常见的一个业务场景: "耗时任务管理"。这个需求对于后端的朋友们一定不陌生,他包含的需求有:

  • POST 创建任务,返回任务 ID
  • GET 查询任务的完成度、结果、开始时间、结束时间
  • PUT 修改任务的状态

异步的场景主要在第一步,创建任务后接口需要立马返回任务 ID,而任务放在后台执行,需要能监测到他的执行进度。

asyncio 方案

为什么不说是 python 方案?因为 python 不止 asyncio 一种异步方案,在此之前已经有其他方案。asyncio 是目前官方推崇的一种,希望接下来 python 的异步生态圈以他为核心展开。

下面就是核心 POST 方法,使用的库含有aihttp, tortoise

@routes.post('/task')
@swagger_path("tasks.yaml")
async def create_task(request):
    # 等待请求数据的接收
    data = await request.json()
    task_name = data.get('task_name', '')
    timestamp = datetime.datetime.now()
    # 等待连接数据库
    await init_db()
    # 数据库异步上下文
    async with in_transaction() as conn:
        task = Task(task_name=task_name, start_time=timestamp)
        # 等待数据写入
        await task.save()
        task_id = task.id
    # 异步执行任务
    asyncio.create_task(perform_task(task_id)) 
    # 当前返回
    return web.json_response({"status": "ok", "task_id": task_id})

 async def perform_task(task_id):
    """simple task
    """
    await asyncio.sleep(3)
    await update_status(task_id, "30%")
    await asyncio.sleep(3)
    await update_status(task_id, "60%")
    await asyncio.sleep(10)
    await update_status(task_id, "100%")
    await init_db()
    async with in_transaction() as conn:
        end_time = datetime.datetime.now()
        await Task.filter(id=task_id).using_db(conn).update(end_time=end_time)
    print(f'Task {task_id} ended!')

  async def update_status(task_id, status):
    await init_db()
    await Task.filter(id=task_id).update(status=status)

代码很简单,其实只有两点是重中之中:

  • await 的用法,写 python 异步程序时刻谨记要将需要挂起执行等待的任务使用 await
  • 并行的方法,使用 asyncio 提供的 Task 对象

第二点 Task 对象有必要解释一下:

# 异步执行任务
asyncio.create_task(perform_task(task_id)) 
# 当前返回
return web.json_response({"status": "ok", "task_id": task_id})

创建一个 Task 对象后,会自动注册到时间循环中,不阻塞当前的程序,这里很容易理解错写成:

await perform_task(task_id)
return web.json_response({"status": "ok", "task_id": task_id})

这样 GET 方法不会立马返回 ID 的,他会阻塞在 await 这里。可能这块大家就会疑惑,await 不是已经将程序的执行权交出了吗为什么还能在这里阻塞?

需要明白的是,await 将执行权交由主程序执行其他协程 (或可等待对象,包含 coroutine, Task, Future), 而 GET 方法本身就是当前主程序(aiohttp server 程序)的一个子协程,其内的 await 一定是阻塞的。

这里当然也不要把Task对象理解为非阻塞的,如果写

await syncio.create_task(perform_task(task_id))

他照样是阻塞的。

总结下asyncio方案需要注意的地方:

  • await 一个可等待对象。等待后面的程序会阻塞,如果等待一个非可等到对象,程序会解释错误。
  • 不要期望 await 能帮住你实现并行,await 只能帮你实现协程,而并行需要借助 asyncio 的事件循环,Task 对象是 asyncio 提供的一种高级对象,除了自动注册执行,他还有更多功能。[1]
  • 协程内使用异步实现的库,不能等待一个非异步的库方法,比如 request, sqlAlchemy, 标准文件读写 open,这些都是阻塞的、不可等待的。需要对应换成基 asyncio 实现的非阻塞库。

第三点也是目前 asyncio 限制的地方,毕竟大多数人熟悉的库都不能使用了。从我去年写那篇文章,一年时间过去了,python 的 aio 生态圈与去年比变化不大,3.8 版本也没有在这里大刀阔斧地改进,国内熟悉并使用 python 这一套机制的人也不是很多。似乎大家还是习惯多线程和回调机制。

此外的原因,我想可能是这种编程风格,将回调写成同步的形式,设计者认为这是优雅的,而也有使用者认为这是别扭的。

goroutine 方案

不要通过共享来通信,而要通过通信来共享。

---- Go 语言设计哲学

go 语言实现同样的 API 我选择了gin框架,数据库使用了标准库database/sql ;

跟 python 不同,go 语言本身从设计之初就支持异步,goroutine是一套语言标准机制,所以他不存在阻塞库与异步方法不兼容的问题。goroutine 的原理和使用方法可以详细参考[5] 。

按照我的理解,我认为goroutine既是多线程 + 异步的方式,如果在asyncio上再加一层多线程,差不多性能上能相同。但更多的是使用方法上的不同:

package main

import (
    "fmt"
    "time"
    "strconv"
    "github.com/gin-gonic/gin"
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
)
func main() {
    r := gin.Default()
    r.GET("/task", fetchTask)
    r.POST("/task", createTask)
    r.Run(":8080")
}
func createTask(c *gin.Context) {
    db, _ := sql.Open("mysql", DB)
    defer db.Close()
    taskName := c.DefaultPostForm("task_name", "default")
    res, _ := db.Exec("insert into task(task_name, start_time) values(? ,?)", taskName, time.Now())
    task_id, _ := res.LastInsertId()
  // 执行任务
    go func (task_id int64) {
        db, _ := sql.Open("mysql", DB)
        defer db.Close()
        fmt.Println("Work.....")
        time.Sleep(1e8)
        _, err := db.Exec("update task set status='50%' where id=?", task_id)
        if err != nil {
            fmt.Println("Error......", err)
        }
        time.Sleep(5e8)
        fmt.Println("Work.....")
        db.Exec("update task set status='100%' where id=?", task_id)
        db.Exec("update task set end_time=? where id=?", time.Now(), task_id)
    }(task_id)
  // 直接返回,goroutine非阻塞
    c.JSON(200, gin.H{
        "status": "ok",
        "task_id": task_id,
    })
}

与 python 的主要区别是两个关键字goawait, await 后的程序是阻塞的,go func () 会切换出一个 goroutine 执行任务,不阻塞当前程序。从逻辑上看,goroutine 与多线程一样,区别在于变量的通信和同步方式(示例场景不涉及变量的通信)

await 后阻塞程序的好处在于这迫使我们写出的程序是单线程的,数据的同步和共享变得很自然。缺点在于事件的并行是非抢占式的,他需要 await 来显示地 "让出"cpu 核心,不太符合我们一般的思维。我们总是期望计算机自行帮助我们调度事件。在之后的工作中,我也多的试着将使用多线程完成的场景改用 asyncio 实现,可能真正的熟悉这套用法后会体会到他本身的优雅。

goroutine 的优势在于更符合我们熟悉的多线程编程思维,而数据的同步和共享可以通过 channel 机制很好地解决。缺点在于 goroutine 是半抢占式的,他使用哪个核心,是否切换线程这都是用户无法控制的。

以上两个 API 完整的功能代码可以从 这里 找到;

总结,两种风格比较

个人观点,正是 go 语言自然的设计使得为更多开发者接受,而 python 的异步因为历史原因虽然一直在努力但仍未大家广泛接受;我曾试图给其他人解释 goroutine 的用法,就按照多线程来理解很容易被接受;而 python 的async/await关键字要彻底能够清除还要从yield\yield from慢慢解释。

但我认为最重要的不是具体的实现方法,而是异步的思维。如何使得自己要写的任务逻辑异步且并发、高效地使用所有核心,不放任程序阻塞在 IO 任务,当然这往往比写同步的逻辑需要多耗费时间和脑力。

最后扯点别的

《大象希形》,这是我在本科阶段学习 UML 课程时所使用的的教材,因为所讲理论过分抽象,这门课也一致被我们认为最具玄幻色彩的课程。他一度让我怀疑软件理论世界是一门艺术而非科学。最后一次课老师关于 "器" 与 "术" 关系的探讨深刻影响了我的价值观。

学习 CS 至今,从编程语言到框架、软件。我渐渐明白技术栈永远是最不重要的。这也是我放弃就业而读研的原因,回望四年本科,接触的工具很多,似乎真正的道理还没参透,着实惭愧。

Reference