Python学习笔记-第12天:异步编程(2)和单元测试

第十二天 异步编程(2)和单元测试

今天计划学习Python的多线程编程异步编程,学习项目及练习源码地址:
GitHub源码

协程

参见昨天的学习记录

无阻塞

异步程序依然会假死freezing

freezing案例:

import asyncio
import time
import threading

#定义一个异步操作
async def hello1(a,b):
    print(f"异步函数开始执行")
    await asyncio.sleep(3)
    print("异步函数执行结束")
    return a+b

#在一个异步操作里面调用另一个异步操作
async def main():
    c=await hello1(10,20)
    print(c)
    print("主函数执行")

loop = asyncio.get_event_loop()
tasks = [main()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

'''运行结果为:
异步函数开始执行(在此处要等待3秒)
异步函数执行结束
30
主函数执行
'''

例子中,hello1是一个耗时3s的异步任务,main也是一个异步方法,但是main需要调用hello1的返回值,所以必须登台hello1执行完成才能继续执行main,这说明异步也是会有阻塞的。

而之前定义的异步函数不用等待是因为事件循环将所有的异步操作‘gather’起来,在多个操作间不同的游走切换,来回调用所有没有等待。

也可以理解为,事件循环只有一个异步操作在处理,没有可以切换执行的目标,所以只能等待当前的操作完成。

多线程+asyncio解决调用时freezing

为了让一个协程函数在不同的线程中执行,我们可以使用以下两个函数:

  1. loop.call_soon_threadsafe(callback, *args),这是一个很底层的API接口,一般很少使用
  2. asyncio.run_coroutine_threadsafe(coroutine,loop) 第一个参数为需要异步执行的协程函数,第二个loop参数为在新线程中创建的事件循环loop,注意一定要是在新线程中创建哦,该函数的返回值是一个concurrent.futures.Future类的对象,用来获取协程的返回结果。 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) 在新线程中运行协程result = future.result()等待获取Future的结果

示例代码:

import asyncio 

import asyncio,time,threading

#需要执行的耗时异步任务
async def func(num):
    print(f'准备调用func,大约耗时{num}')
    await asyncio.sleep(num)
    print(f'耗时{num}之后,func函数运行结束')

#定义一个专门创建事件循环loop的函数,在另一个线程中启动它
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

#定义一个main函数
def main():
    coroutine1 = func(3)
    coroutine2 = func(2)
    coroutine3 = func(1)

    new_loop = asyncio.new_event_loop()                        #在当前线程下创建时间循环,(未启用),在start_loop里面启动它
    t = threading.Thread(target=start_loop,args=(new_loop,))   #通过当前线程开启新的线程去启动事件循环
    t.start()

    asyncio.run_coroutine_threadsafe(coroutine1,new_loop)  #这几个是关键,代表在新线程中事件循环不断“游走”执行
    asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
    asyncio.run_coroutine_threadsafe(coroutine3,new_loop)

    for i in "iloveu":
        print(str(i)+"    ")

if __name__ == "__main__":
    main()

'''运行结果为:
i    准备调用func,大约耗时3
l    准备调用func,大约耗时2
o    准备调用func,大约耗时1
v
e
u
耗时1之后,func函数运行结束
耗时2之后,func函数运行结束
耗时3之后,func函数运行结束
'''

第一步:定义需要异步执行的一系列操作,及一系列协程函数;

第二步:在主线程中定义一个新的线程,然后在新线程中产生一个新的事件循环;

第三步:在主线程中,通过asyncio.run_coroutine_threadsafe(coroutine,loop)这个方法,将一系列异步方法注册到新线程的loop里面去,这样就是新线程负责事件循环的执行。

使用asyncio实现一个timer 定时器

所谓的timer指的是,指定一个时间间隔,让某一个操作隔一个时间间隔执行一次,如此周而复始。很多编程语言都提供了专门的timer实现机制、包括C++、C#等。但是 Python 并没有原生支持 timer,不过可以用 asyncio.sleep 模拟。大致的思想如下,将timer定义为一个异步协程,然后通过事件循环去调用这个异步协程,让事件循环不断在这个协程中反反复调用,只不过隔几秒调用一次即可。简单的实现如下(本例基于python3.7:

import asyncio
async def delay(time):
    await asyncio.sleep(time)

async def timer(time,function):
    while True:
        future=asyncio.ensure_future(delay(time))
        await future
        future.add_done_callback(function)

def func(future):
    print('done')

if __name__=='__main__':
    asyncio.run(timer(2,func))

aiohttp模块

asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

  • 安装

    pip3 install aiohttp

  • 示例代码

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, {}!</h1>'.format(request.match_info['name'])
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

aiomysql

Python3.7+ 下的一个异步操作mysql数据的模块,官方地址

示例:

#coding:utf-8

import aiomysql
import asyncio
import logging
import traceback
'''
mysql 异步版本
'''

logobj = logging.getLogger('mysql')

class Pmysql:
    __connection = None

    def __init__(self):
        self.cursor = None
        self.connection = None

    @staticmethod
    async def getconnection():
        if Pmysql.__connection == None:
            conn = await aiomysql.connect(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='123456',
                db='mytest',
                )
            if conn:
                Pmysql.__connection = conn
                return conn
            else:
                raise("connect to mysql error ")
        else:
            return Pmysql.__connection

    async def query(self,query,args=None):
        self.cursor = await self.connection.cursor()
        await self.cursor.execute(query,args)
        r = await self.cursor.fetchall()
        await self.cursor.close()
        return r


async def test():
    conn = await Pmysql.getconnection()
    mysqlobj.connection = conn
    await conn.ping()
    r = await mysqlobj.query("select * from person")
    for i in r:
        print(i)
    conn.close()

if __name__ == '__main__':
    mysqlobj = Pmysql()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

aioredis

redis异步操作库,官方地址

示例:

import aioredis
import asyncio

class Redis:
    _redis = None

    async def get_redis_pool(self, *args, **kwargs):
        if not self._redis:
            self._redis = await aioredis.create_redis_pool(*args, **kwargs)
        return self._redis

    async def close(self):
        if self._redis:
            self._redis.close()
            await self._redis.wait_closed()


async def get_value(key):
    redis = Redis()
    r = await redis.get_redis_pool(('127.0.0.1', 6379), db=7, encoding='utf-8')
    value = await r.get(key)
    print(f'{key!r}: {value!r}')
    await redis.close()         

if __name__ == '__main__':
    asyncio.run(get_value('key'))  # need python3.7

测试

单元测试

单元测试是用来对一个模块、一个函数或者一个类来进行正确性检验的测试工作。

Python自带的unittest模块可以很方便的让我们编写单元测试。

编写单元测试时,我们需要编写一个测试类,从unittest.TestCase继承。
以test开头的方法就是测试方法,不以test开头的方法不被认为是测试方法,测试的时候不会被执行。

对每一类测试都需要编写一个test_xxx()方法。由于unittest.TestCase提供了很多内置的条件判断,我们只需要调用这些方法就可以断言输出是否是我们所期望的。

代码示例:

'''
定义一个要测试的类
mydict.py
'''
class MyDict(dict):

    def __init__(self, **kw):
        super().__init__(**kw)

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Dict' object has no attribute '%s'" % key)

    def __setattr__(self, key, value):
        self[key] = value

编写单元测试:

import unittest

from mydict import MyDict

class TestDict(unittest.TestCase):

    def test_init(self):
        d = MyDict(a=1, b='test')
        self.assertEqual(d.a, 1)
        self.assertEqual(d.b, 'test')
        self.assertTrue(isinstance(d, dict))

    def test_key(self):
        d = MyDict()
        d['key'] = 'value'
        self.assertEqual(d.key, 'value')

    def test_attr(self):
        d = MyDict()
        d.key = 'value'
        self.assertTrue('key' in d)
        self.assertEqual(d['key'], 'value')

    def test_keyerror(self):
        d = MyDict()
        with self.assertRaises(KeyError):
            value = d['empty']

    def test_attrerror(self):
        d = MyDict()
        with self.assertRaises(AttributeError):
            value = d.empty

运行单元测试

一旦编写好单元测试,我们就可以运行单元测试。最简单的运行方式是在mydict_test.py的最后加上两行代码:

if __name__ == '__main__':
    unittest.main()

另一种方法是在命令行通过参数-m unittest直接运行单元测试:

python -m unittest mydict_test

这是推荐的做法,因为这样可以一次批量运行很多单元测试,并且,有很多工具可以自动来运行这些单元测试。

setUp() tearDown()在每次执行之前准备环境,或者在每次执行完之后需要进行一些清理。比如执行前需要连接数据库,执行完成之后需要还原数据、断开连接。

如果想要在所有case执行之前准备一次环境,并在所有case执行结束之后再清理环境,我们可以用 setUpClass() 与 tearDownClass()

跳过某个case需要用到skip装饰器一共有三个:unittest.skip(reason)、unittest.skipIf(condition, reason)、unittest.skipUnless(condition, reason),skip无条件跳过,skipIf当condition为True时跳过,skipUnless当condition为False时跳过。

在VS Code中对Python进行单元测试

Python扩展支持使用Python的内置unittest框架以及pytest和Nose进行单元测试。要使用pytest和Nose,必须将它们安装到当前的Python环境中(即,在pythonPath设置中标识的环境,请参阅环境)。

使用Python:Discover Unit Tests根据当前所选测试框架的发现模式扫描项目以进行测试(请参阅测试发现。一旦发现,VS Code提供了多种运行测试的方法(请参阅运行测试)。

单元测试输出显示在Python Test Log面板中,包括未安装测试框架时导致的错误。

在settings.json中进行设置:

{
    "python.pythonPath": "/usr/local/bin/python3",
    "python.testing.unittestEnabled": true,
    "python.testing.unittestArgs": [
        "-v",
        "-s",
        "./src/tests",
        "-p",
        "test_*.py"
    ],
    "python.testing.pytestEnabled": false,
    "python.testing.nosetestsEnabled": false,
}

Unittest配置设置

设置 默认 描述
unittestEnabled false 指定是否为单元测试启用UnitTest。
unittestArgs [“-v”, “-s”, “.”, “-p”, “test.py”] 传递给unittest的参数,其中由空格分隔的每个元素是列表中的单独项。有关默认值的说明,请参见下文。
CWD 空值 指定单元测试的可选工作目录。
outputWindow “Python Test Log” 用于单元测试输出的窗口。
promptToConfigure true 指定VS代码是否在发现潜在测试时提示配置测试框架。
DEBUGPORT 3000 用于调试UnitTest测试的端口号。
autoTestDiscoverOnSaveEnabled true 指定在保存单元测试文件时是启用还是禁用自动运行测试发现。

UnitTest的默认参数如下:

-v设置默认详细程度。删除此参数以获得更简单的输出。

-s .指定用于发现测试的起始目录。如果您在“test”文件夹中进行了测试,则可以将其更改为-s test(”-s”, “test”在arguments数组中)。

-p test.py是用于查找测试的发现模式。在这种情况下,它.py是包含单词“test” 的任何文件。如果以不同的方式命名测试文件,例如在每个文件名后附加“_test”,则使用类似于*_test.py数组的相应参数的模式。

要在第一次失败时停止测试运&ZeroWidthSpace;&ZeroWidthSpace;行,请将fail fast选项添加”-f”到arguments数组中。

文档测试

如果你经常阅读Python的官方文档,可以看到很多文档都有示例代码。比如re模块就带了很多示例代码:

>>> import re
>>> m = re.search('(?<=abc)def', 'abcdef')
>>> m.group(0)
'def'

可以把这些示例代码在Python的交互式环境下输入并执行,结果与文档中的示例代码显示的一致。

这些代码与其他说明可以写在注释中,然后,由一些工具来自动生成文档。既然这些代码本身就可以粘贴出来直接运行,那么,可不可以自动执行写在注释中的这些代码呢?

答案是肯定的。

当我们编写注释时,如果写上这样的注释:

def abs(n):
    '''
    Function to get absolute value of number.
    
    Example:
    
    >>> abs(1)
    1
    >>> abs(-1)
    1
    >>> abs(0)
    0
    '''
    return n if n >= 0 else (-n)

无疑更明确地告诉函数的调用者该函数的期望输入和输出。

并且,Python内置的“文档测试”(doctest)模块可以直接提取注释中的代码并执行测试。

doctest严格按照Python交互式命令行的输入和输出来判断测试结果是否正确。只有测试异常的时候,可以用…表示中间一大段烦人的输出。

小结

今天主要学习了Pyton的异步编程,并简单了解了下相关的常用模块。并针对单元测试进行了详细的了解,单元测试很重要。明天打算开始学习下Pyton的函数式编程。