Python异步asyncio库学习总结

异步I/O主要解决的问题是减少I/O操作的的等待时间,将原本CPU要等待的时间用来执行其它的任务,以此来提高性能,I/O可以是网路I/O和本地文件I/O。

网络I/O:对于网络底层来说,发出远程TCP/UDP请求后,数据要经过一条条网线,一个个路由器,到达目的地,回应数据然后还要原路返回,如果CPU执行的是同步的线程代码,那么这段时间内CPU不会继续执行该线程内的其它任务(或者说代码),而是在等待结果的返回。由此可看出对CPU资源的浪费。
本地文件I/O:磁盘相对于CPU来说运行是极为缓慢的,因为CPU的运行是电力,是瞬间的事情,而磁盘,特别是机械硬盘,电力驱使盘片转动,然后磁臂还要从0磁道移动寻址,属于机械运动,相比电力慢的要死,寻找到之后还要送到内存,如果数据不在同一个扇区或盘片,那么还要进行多次寻址。

I/O模型

Unix下的五种 I/O 模型,它们是随着时间层层递进的。为了安全内存分为内核区和用户区,I/O或socket的数据都要先到内核区,然后复制到用户区中用户程序的缓冲区中:

阻塞式 I/O :发起I/O请求之后就等待,占用但不耗费CPU的计算,但是浪费CPU的计算

非阻塞式 I/O:发起I/O请求后不等待,一直循环询问是否返回成功,占用和耗费CPU,效率比阻塞式低。但是在后面的代码不依赖前面的代码的结果时,效率比阻塞式高

I/O 多路复用:使用较多。发起I/O请求后不等待,不占用和不耗费CPU的计算,由用户发起select函数监听和处理(其实内部是调用了系统的select函数),相比非阻塞的循环,select能监听多个资源的状态,等资源完成之后由操作系统使用select函数通知某任务完成并返回可读条件,用户程序的recvfrom函数接收到之后就发起系统调用,将内核区的可读数据复制到用户区。该模式节省了内核等待数据的时间,但没有节省内核区到用户区的复制时间

信号驱动式 I/O:使用较少。和I/O多路复用差不多,只是将select函数换成了信号量函数sigaction,通过传递信号量的方式管理,其它的和I/O多路复用一致

异步 I/O:相比I/O多路复用,将select换成aio_read函数负责监听和响应,准备好数据之后不用通知app,立即将数据从内核区复制到用户区,减少了一点时间。但是没有减少许多,现在还是常使用技术较为成熟的I/O多路复用技术。但随着时间的推移,异步I/O会逐渐发展,预测会成为主流

I/O多路复用三个机制(扩展知识,知道即可)

select、poll、epol都是I/O多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但 select,pol,epol本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

select函数监视的文件描述符分3类,分别是 writefds、 readfds、和exceptfds。调用后 select函数会阻塞,直到有描述符就绪(有数据可读可写、或者有 except),或者超时( timeou指定等待时间,如果立即返回设为null即可),函数返回。当 select函数返回后,可以通过遍历 fdset,来找到就绪的描述符。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。 select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

不同于 select使用三个位图来表示三个 fdset的方式,poll使用一个pollfd的指针实现。
pollfd结构包含了要监视的event和发生的 event,不再使用 select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。和 select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
从上面看, select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的资源。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降

epoll是在Linux2.6内核中提出的,只有Linux支持,Windows不支持,是之前的 select和poll的增强版本。相对于 select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

epoll并不代表一定比select好

在并发高的情况下,连接活跃度不是很高(很明确一会儿就下线),epoll比select好
并发性不高,但连接很活跃(游戏连接上后访问很频繁),select比epoll好

Python协程

问题的引出:

I/O多路复用虽然发起I/O操作后不等待,而是切换到其它进程或线程,但这是线程或进程级的,代价相对来说还是很大;

由于线程之间的切换开销太大并且解决线程间同步时使用的锁使性能变小,所以我们希望能在单线程内部使用函数解决任务,而不是使用多线程;

如果使用普通的函数,普通函数内部就使用了栈的方式运行函数,而栈的先入后出的机制是同步的,就不能使任务异步运行了;

由于I/O和socket操作时CPU要等待较长时间,我们希望在发生I/O和socket操作时能暂停该任务转而执行其它任务,等数据完成之后能接收到通知进而继续执行。

GIL(global interpreter lock 全局解释器锁),Cpython中的一个线程相当于C语言的一个线程,gil使得同一时刻只有一个线程在一个CPU上执行。gil会根据执行的字节码行数以及时间片释放gil,在遇到 I/O 操作时也会主动释放。gil是线程级的,对多进程无影响,并且只在Cpython解释器中有,如 Jpython等解释器不存在。

为了解决以上问题,就出现了协程,协程:在单线程中使用函数,该函数可以暂停,然后保存当前的运行状态,并可以向暂停的地方传入值以实现信息在多个任务之间的传递,并且可以根据保存的状态恢复运行。 刚好生成器完美的符合了条件。

可以看出协程不是如进程和线程般由操作系统内部给出,而是要自己定义实现。而实现协程不只是只能用生成器,还可以使用其它库如asyncio库(3.4版本解释器才开始有),还可以使用async、await关键字(3.5版本解释器才开始有)

生成器

这里只介绍几点基本的生成器常识,想学懂生成器要自行去学

# 初级生成器
def Geturl():
    v = yield 'http://www.baidu.com'
    print(v)
    yield 8
    yield 9
    yield 10
    return 'hello'  # 若使用next或send函数会收到返回异常的信息


if __name__ == '__main__':
    url = Geturl()  # 初始化生成器
    print(next(url))  # 第一次调用时要使用next,若使用send调用要传递一个None,即url.send(None),否则报错
    print(url.send('我传递了值'))  # 第二次传递可以传递任何值
    print(next(url))
    url.close()  # 关闭生成器后,后面不能再next()了
    print(next(url))  # 会抛异常
    # for i in Geturl():
    #     print(i)

加了yield关键字的函数就是一个生成器。普通的调用方法就是先初始化,然后使用next()函数调用,如果想向里面传值,可以使用send()方法。还可以使用for循环的方式直接调用。

yield from 是yield的升级版,yield from后面可以加可迭代对象,也可以是迭代器,甚至是生成器。

生成器写一个完整的异步I/O是麻烦的,我们没必要在这上面浪费时间(除非你想学懂原理然后自己写一个第三方库),则这里只介绍了有这么一个东西

asyncio库

这里使用较为老式的写法,后面会一步步改进

future对象,一个偏底层的对象,在异步I/O中,我们发出I/O操作后,就跳转到另一个任务,当I/O结束之后,我们希望能监听该I/O操作的状态,并正常接收返回结果,future对象就是做这些东西的。然而新版本的asyncio库的task对象继承了该对象,并实现了转化为任务的功能。

事件循环,像上面socket编程般,我们希望有一个循环,能够对传进来的任务进行一一处理,直到所有任务执行完毕之后能结束循环

import asyncio

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(func_1()))

你可能看不懂这里的代码逻辑,没关系,后面会慢慢讲到。上面只有一个任务,而协程就是解决多任务时I/O的等待,所以上面的写法没有意义,如果写多个任务时,可能你会这么写

import asyncio

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(func_1()))
loop.run_until_complete(asyncio.ensure_future(func_2()))

上面这样虽说能运行,但也是错的写法,因为这本质还是同步的代码,所以我们要将它打包

import asyncio

@asyncio.coroutine  # 使用装饰器,申明该函数是一个异步函数
def func_1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    print(2)

@asyncio.coroutine
def func_2():
    print(3)
    yield from asyncio.sleep(2)  # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    print(4)

tasks = [
    asyncio.ensure_future(func_1()),  # 创建一个future对象并添加到列表中
    asyncio.ensure_future(func_2())  # asyncio.ensure_future是老版本的创建方法,asyncio.create_task是新本版的创建方法
]

loop = asyncio.get_event_loop()  # 创建事件循环
loop.run_until_complete(asyncio.wait(tasks))  # 将任务添加进事件循环队列,其中asyncio.wait方法将future对象转为任务
# 执行结果:1 3 2 4

执行以上程序你会发现,会立马输出 1 3 ,等过了两秒之后再立马输出 2 4 。先执行func_1函数,输出1后,就遇到等待操作,此时不会继续执行后面的输出2,而是停止当前任务,执行第二个任务func_2函数,输出3之后,也遇到了等待操作,也不会继续执行下面的输出4操作,然而此时事件循环中没有任务了,就要等待,等2秒后,底层的future对象监听到func_1任务等待结束,然后func_1函数恢复,继续执行后面的输出2操作,此时任务1结束,而事件循环中也没有其它要执行的任务,很快,func_2也等待结束,也恢复继续执行后面的输出4操作,任务2也结束了,此时事件循环中没有任务了,就会结束循环。

上面代码的细节后面还会继续讲

async、await关键字

async、awit 关键字在Python3.5版本中开始引入的关键字,它基于asyncio库实现了更好的体验,简便了写法。并且asyncio库的@asyncio.coroutine装饰器在3.8版本就被移除了,官方也推荐使用async、await关键字的写法实现异步操作。

import asyncio

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)

tasks = [
    asyncio.ensure_future(func_1()),
    asyncio.ensure_future(func_2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# 执行结果:1 3 2 4

这里的执行效果和上面的一致,功能也一致。可以发现@asyncio.coroutine被替换成了async关键字,yield from换成了await关键字。

async关键字申明了该函数是一个异步函数,我们关注到的作用就只有这些。

await关键字只能在使用了async申明过的协程函数中使用,该关键字后面加一个可等待对象(什么是可等待对象?能被暂停的对象,像生成器那样可以被暂停)。可等待 对象有三种主要类型: 协程, 任务 和 Future.

执行到func_1任务时,输出1,执行到await asyncio.sleep(2)语句,由于asyncio.sleep()方法是一个可等待对象,执行该语句,发现它是一个耗时的I/O操作,就停止等待该func_1任务,然后到循环队列中执行下一个任务。

Task对象

官方对Task对象的解释:https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.Task

我们先看一个简单的应用例子


import asyncio

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "函数一的返回值"

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)
    return "函数二的返回值"

async def main():
    print("main开始")

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.Task(func_1())

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task2 = asyncio.Task(func_2())

    print("main结束")

    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    # 此处的await是等待相对应的协程全都执行完毕并获取结果
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()  # 创建事件循环
    loop.run_until_complete(main())  # 将任务添加进事件循环队列
# 运行结果:
# main开始
# main结束
# 1
# 3
# 2
# 4
# 函数一的返回值 函数二的返回值

也许你会感到奇怪,前面将任务传进事件循环队列中前,都先为协程函数创建一个future对象,然后再将该future对象用asyncio.wait( )方法转为可等待的任务对象。而这里直接传一个协程函数,可以吗?

首先看一个例子,下面结果表明,一个函数的执行总是先接收参数,当把一个函数的实现(不是函数地址)放进一个参数时,就会先执行该函数,再执行函数里面的逻辑

# demo1
def aaa():
    print('hello')

def bbb(a):
    print('world')

bbb(aaa())
# hello
# world

然后看看之前将future对象转为任务对象的时候asyncio.wait( )它在背后做了什么,看它的源码

async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
        if futures.isfuture(fs) or coroutines.iscoroutine(fs):
        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    if not fs:
        raise ValueError('Set of coroutines/Futures is empty.')
    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
        raise ValueError(f'Invalid return_when value: {return_when}')

    if loop is None:
        loop = events.get_event_loop()

    fs = {ensure_future(f, loop=loop) for f in set(fs)}

    return await _wait(fs, timeout, return_when, loop)

他先检查传进来的值是否有错误,然后loop参数是None,所以要在事件对象(事件对象在创建事件循环时已经创建了)中获取一个事件循环,然后将我们传进来的future对象转为集合后再进行迭代,并将迭代的每一个值传进一个ensure_future( )方法中,同时将获取的事件循环作为参数传入,那么我们会想,如果我们前面在列表中不使用asyncio.ensure_future( )方法处理协程函数,让他在这里处理行不行?答案是可以的,但你会发现它们的任务执行顺序是无序的,因为用set( )方法转为集合,集合就是无序的。

import asyncio

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)

tasks = [
    # asyncio.ensure_future(func_1()),
    # asyncio.ensure_future(func_2())
    func_1(),
    func_2()
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# 输出结果:3 1 4 2

然后我们再跟进return await _wait(fs, timeout, return_when, loop)里面,发现它里面就是将一个协程函数转为一个可等待对象,并返回了两个值,return done, pending ,await方法可以添加timeout参数,如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中,注意,如果爬取一个网页需要等待2秒才有返回值,而设置timeout为1秒,future对象没有获取到返回值,那么就是未完成。

现在看回Task对象的原理,官方的解释

一个与 Future 类似 的对象,可运行 Python 协程。非线程安全。

Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。

事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。

使用高层级的 asyncio.create_task() 函数来创建 Task 对象,也可用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。

总的来说就是实现了前面的asyncio.ensure_future( )和asyncio.wait( )方法,同时还提供了许多控制协程的方法,如取消Task对象、判断Task对象是否完成、返回Task对象执行结果、添加一个回调函数将在Task对象完成后被执行,等方法。我们最关心的是返回值和回调函数,因为如写爬虫时总得要获取爬取的结果,总得要在获取结果后调用其它方法将结果写入到数据库中。

获取结果
该用法在上面已经用过了,ret1就是任务执行后返回的结果

task1 = asyncio.Task(func_1())
ret1 = await task1

回调函数

回调函数不是简单的指定一个普通函数就可以了,它的用法要使用到内置库functools,与前面有差别的地方我都做了注释

import asyncio
import functools  # 导入该库

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "函数一的返回值"

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)
    return "函数二的返回值"

def say(future, number):  # 必须有一个参数,名字任意,该参数指向的是Task对象
    print('我', number, '岁了')


async def main():
    print("main开始")

    task1 = asyncio.Task(func_1())
    
    task1.add_done_callback(functools.partial(say, number='6'))  # 添加回调函数,赋值时必须要指定参数

    task2 = asyncio.Task(func_2())
    print("main结束")

    ret1 = await task1
    ret2 = await task2

    print(ret1, ret2)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。同步调用是一种阻塞式调用,调用方要等待对方执行完毕 才返回,它是一种单向调用;回调是一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口;异步调用是一种类似消息或事件的机制,不过它 的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口)。回调和异步调用的关系非常紧密,通常我们使用 回调来实现异步消息的注册,通过异步调用来实现消息的通知。同步调用是三者当中最简单的,而回调又常常是异步调用的基础。

future和task

看了上面的代码,也许会发现,future和task好像差不多,为什么要存在两个?

future是一个底层的对象,是对协程的封装,Future对象提供了很多任务方法(如完成后的回调、取消、设置任务结果等等),但对于开发者来说这么底层的东西要想操作它,就要彻底搞懂它的每一个逻辑,这样成本太高。

task对象 class Task(futures._PyFuture):继承了future对象,并对future进行管理调度,所以推荐使用task对象进行创建任务。

优化代码
你会发现每次写事件循环都要写两行

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

开发者也知道这个问题,所以在Python 3.7新加接口asyncio.run( )代替上面两行,同时还包装了其它功能。

不过要注意一点,不能在asyncio.run( )方法之前使用asyncio.ensure_future( )方法或者asyncio.Task( )方法创建future对象,只能在之后创建,否则报错。

官方也说了不建议手动实例化Task对象,所以我们以后就使用create_task( )方法替代Task( )方法好了

# 写法一
import asyncio
import time

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)

start_time = time.time()  # 开始运行的时间
tasks = [
    func_1(),
    func_2()
]

asyncio.run(asyncio.wait(tasks))  # 由于wait方法里面可以实现转化为future对象,我们可以不用在前面转化
print(time.time() - start_time)  # 结束的时间减去开始时间,可以发现总时间2秒
# 写法二
import asyncio
import time

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "函数一的返回值"

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)
    return "函数二的返回值"

async def main():
    print("main开始")
    task1 = asyncio.create_task(func_1())
    task2 = asyncio.create_task(func_2())
    print("main结束")
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)

if __name__ == "__main__":
    start_time = time.time()  # 始运行的时间
    asyncio.run(main())
    print(time.time() - start_time)  # 结束的时间减去开始时间,可以发现总时间2秒
# 写法三
import asyncio
import time

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "函数一的返回值"

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)
    return "函数二的返回值"

async def main():
    tasks = [asyncio.create_task(func_1()), asyncio.create_task(func_2())]
    done, pending = await asyncio.wait(tasks)
    print(done, pending)

if __name__ == "__main__":
    start_time = time.time()  # 始运行的时间
    asyncio.run(main())
    print(time.time() - start_time)  # 结束的时间减去开始时间,可以发现总时间2秒

因为我们更希望执行任务和返回任务结果时总是对应着创建任务的顺序,而第一种写法不能自己创建有顺序的task任务,所以一般不使用第一种写法。

第二种写法中,每次添加一个任务,不仅要添加进任务列表中,还要手动添加运行等待对象,不太方便。

第三种写法中。执行完毕的返回结果放在done集合中,与任务的创建顺序一致,但结果放在Task对象中,取的时候不太方便。如果不是特殊要求,我更加希望使用下面的第四种写法。

# 第四种写法,推荐使用该方法写

import asyncio
import time

async def func_1():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "函数一的返回值"

async def func_2():
    print(3)
    await asyncio.sleep(2)
    print(4)
    return "函数二的返回值"

async def main():
    tasks1 = (func_1(), func_2())  # 分组1
    tasks2 = [func_2(), func_1()]  # 分组2
    result_1 = await asyncio.gather(*tasks1)
    result_2 = await asyncio.gather(*tasks2)
    print(result_1, result_2)

if __name__ == "__main__":
    start_time = time.time()  # 始运行的时间
    asyncio.run(main())
    print(time.time() - start_time)  # 结束的时间减去开始时间,可以发现总时间4秒
# result_1的返回值:['函数一的返回值', '函数二的返回值']
# result_2的返回值:['函数二的返回值', '函数一的返回值']

asyncio.gather( )方法在基础上和asyncio.wait( )方法一致,但它还提供了分组、具体的返回值和任务的取消。一组内的运行是异步的,但不同分组的之间是同步的,所以运行上面的代码时间是4秒。asyncio.gather( )返回的结果不是一个对象,而是具体的返回值,不管分组是列表、元组和集合,它的返回值都放在列表中。gather方法原型为def gather(*coros_or_futures, loop=None, return_exceptions=False):,return_exceptions参数默认为False,代表当运行异步任务时,某一个任务出现了错误,就会立刻将异常返回给等待它的任务,就等于该任务被取消了,但其它的任务不会被影响而继续运行;当参数设置为True,一个任务出现了错误,返回的异常信息作为运行结果,其它的任务同样不会受影响;没有说参数设置为哪个好,一切都要看需求。

读懂了前面的代码,就会有疑问了,实现协程时async申明一个函数为异步函数很容易,但await后面要求要加可等待对象,而平常写的代码基本不会出现可等待对象。我们有没有方法将一个任意的普通函数转为一个可等待对象吗?可以的。

在3.9版本中官方使用的是asyncio.to_thread( )方法,而在3.7版本中没有该方法,3.7使用的是进程池或线程池的方法

concurrent.futures对象

我们可以使用该对象将一个普通函数包装为一个异步的线程或进程

import time
from concurrent.futures.thread import ThreadPoolExecutor
# from concurrent.futures.process import ProcessPoolExecutor


def func_1(start_time, number):
    time.sleep(1)
    print('我是第', number, '个', time.time()-start_time)
    return "函数一的返回值"


if __name__ == "__main__":
    start_time = time.time()  # 始运行的时间
    pool = ThreadPoolExecutor(max_workers=5)  # 创建线程池
    # pool = ProcessPoolExecutor(max_workers=5)  # 创建进程池
    for i in range(1, 11):
        pool.submit(func_1, time.time(), i)  # 提交一个任务,第一个参数是普通函数的本身,后面的所有参数都是普通函数的参数

    print('总的运行时间', time.time() - start_time)  # 结束的时间减去开始时间,可以发现总时间2秒

你可能会感到疑惑,创建池的时候定义最大为5了,为什么能提交10个任务?因为定义的是并发时最大的运行任务,提交的个数无关,只是在运行时只能并发运行5个,剩余的5个要等待。提交完10个任务到池中之后循环结束,继续运行下面的代码。

线程池的运行结果如下。虽说输出结果有点乱,但看结果可知,有五个是1秒的,普通函数要暂停一秒才结束,然后才轮到其它任务运行。也可以看出提交是无限制的,很快就提交完了。整个运行时间为2.115秒。


进程池的运行结果如下。通过第一个输出可知,进程的提交创建是比线程的创建极为费时的。通过输出运行秒数的小数位可知,进程的运行和撤销的一系列运行都是比线程慢的。整个运行时间2.865秒。

虽说由于性能的问题,每个人的运行时间是不同的。协程是解决多I/O操作等待的问题,然而在多线程和多进程中,多线程是最适合解决多I/O的,而多进程是最适合解决多计算的。不谋而合,协程用在多线程中是最合适的。

uvloop

在Windows中直接使用pip安装会失败,建议在Linux中安装运行,同时Linux中的Python解释器版本要3.7版本

一个能大幅度提高异步运行速率的库,只需添加下面两句就可以了,导入库和替换事件循环。由于运行的任务很少,体现不出它的高速。以后写异步编程时,最好添加这两句,他在concurrent.futures对象中不起作用,只在asyncio库中起作用,在其它的真正异步库中也可以使用它

# encoding: utf-8
import asyncio
import uvloop  # 导入该库
import time

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())  # 替换原本的事件循环

async def func_1():
    print("111")
    await asyncio.sleep(2)
    print("222")
    return 'hello'

async def func_2():
    print("333")
    await asyncio.sleep(2)
    print("444")
    return 'world'

async def main():
    result = await asyncio.gather(func_1(), func_2())
    print(result)

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main())
    print(time.time() - start_time)

异步迭代器

暂时没时间写,下回补

异步上下文管理器

暂时没时间写,下回补

应用

aiomysql库

当我们进行MySQL操作时,写的代码是同步的代码,我们可以用上面的concurrent.futures对象转换为异步的线程池或进程池,但我们依旧觉得相比协程来说很慢,所以有人写了aiomysql第三方库,使用协程的方式操作mysql
官方文档:https://aiomysql.readthedocs.io/en/latest/

import asyncio
import aiomysql
# import uvloop  # 注意:如果Windows安装不了该库,就不使用该库


# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())  # 使用uvloop的事件循环队列

async def gai():  # 修改操作
    con = await aiomysql.connect(host='127.0.0.1', user='root', db='xiugai', password='123456')# 连接
    cur = await con.cursor()  # 获取游标
    await cur.execute('insert into test value("1", "zhong")')  # 执行sql语句
    await con.commit()  # 提交sql语句
    await cur.close()  # 关闭游标
    con.close()  # 关闭连接


async def cha():  # 查询操作
    con = await aiomysql.connect(host='127.0.0.1', user='root', db='xiugai', password='123456')# 连接
    cur = await con.cursor()  # 获取游标
    await cur.execute('select * from test')  # 执行sql语句
    result = await cur.fetchall()  # 获取表中的所有行
    print(result)  # 输出查询的结果
    await cur.close()  # 关闭游标
    con.close()  # 关闭连接


async def main():
    tasks_1 = [gai()]  # 分组一
    tasks_2 = [cha()]  # 分组二
    await asyncio.gather(*tasks_1)
    await asyncio.gather(*tasks_2)


if __name__ == "__main__":
    asyncio.run(main())

aiomysql的使用和pymysql的使用基本一致,只是换一个名字,并且前面加关键字await,只要涉及到远程的操作基本都要添加关键字。具体更多的用法,请查看官网,不过官网写的示范代码文档使用的是旧版本的协程写法,如果看懂了前面讲的,怎么切换使用大概是没问题的。

aiofiles库

官方文档:https://pypi.org/project/aiofiles/

官方对该库应用场景描述如下:

aiofiles是用Python编写的Apache2许可库,用于处理asyncio应用程序中的本地磁盘文件。

普通的本地文件IO处于阻塞状态,无法轻松,便携地使其异步。这意味着执行文件IO可能会干扰asyncio应用程序,该应用程序不应阻塞正在执行的线程。aiofiles通过引入支持将操作委派给单独的线程池的异步版本的文件来帮助解决此问题。

我的理解:该库主要的应用场景不是解决高并发的读写文件,而是解决在进行其它异步操作时,原本整个程序的所有任务都是异步的,一旦使用普通的方式读写文件,而普通的方式读写文件是阻塞式的操作,这样读写文件的任务和其它任务之间是同步的,也就是说其它异步任务要等待该文件的操作任务操作的完成才能继续运行,这样破坏了整个异步正常的运行。而平常的写的文件操作代码,一般的操作都是依赖于读写完成后的结果,思维上也就是同步操作,所以平常中没必要使用该库

读写小文件(2.5KB),经过测试,发现平常中使用的同步(0.0009605884552001953秒)的写法比异步(0.02627420425415039秒)的写法快的多,读大文件(60MB),经过测试,发现都差不多。也就是说,在异步程序中,使用该库读写的文件越大越多,体现速度的就越明显

使用该库打开文件可以使用普通方法,也可以使用上下文管理器的方法

import asyncio
import aiofiles
# import uvloop  # 注意:如果Windows安装不了该库,就不使用该库


# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())  # 使用uvloop的事件循环队列

async def write_file():  # 写文件操作,普通方法
    f = await aiofiles.open('./temp1.txt', mode='w', encoding='utf-8')
    await f.write('你好,世界!!!')
    f.close()


async def read_file():  # 读文件操作,上下文管理器方法
    async with aiofiles.open('./temp1.txt', mode='r', encoding='utf-8') as f:  # 异步上下文方式打开读取文件
        value = await f.read()  # 读取操作
    print(value)


async def main():
    tasks_1 = [write_file()]
    tasks_2 = [read_file()]
    await asyncio.gather(*tasks_1)
    await asyncio.gather(*tasks_2)


if __name__ == "__main__":
    asyncio.run(main())

读取文件支持异步迭代的方法

import asyncio
import aiofiles


async def read_file():  # 读文件操作,上下文管理器方法
    async with aiofiles.open('./temp1.txt', mode='r', encoding='utf-8') as f:  # 异步上下文方式打开读取文件
        async for i in f:  # 异步迭代文件内容
            print(i)


async def main():
    tasks_2 = [read_file()]
    await asyncio.gather(*tasks_2)


if __name__ == "__main__":
    asyncio.run(main())

aiohttp库

官方文档:https://docs.aiohttp.org/en/stable/

写爬虫你可能会使用到 cchardet 库,用于识别爬取的未知格式字节属于什么编码格式。 pip install cchardet

为了通过客户端API加快DNS解析速度,您也可以安装 aiodns。强烈建议使用此选项:pip install aiodns

下面复制了官方的代码,如果学过requests库那么学这个就很容易,但在创建对象的写法上有点区别,建议去官网学

import aiohttp
import asyncio

async def main():

    async with aiohttp.ClientSession() as session:
        async with session.get('http://python.org') as response:

            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])

            html = await response.text()
            print("Body:", html[:15], "...")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

https://blog.csdn.net/qq_43279457/article/details/110906565