Asyncio并发编程

我喜欢铁路,你沿着铁路走,在尽头肯定会找到一座城市,或者其他什么有人的地方。不像鸟飞在空中,甚至不知道前面会不会有目的地。

在此之后可以尝试阅读Python 异步协程概念,协程是一个很大的框架,需要从基础原理慢慢学习。

Asyncio

在python3.5之前,都是使用生成器的一些技巧完成协程任务,他们的调度方式依然是 事件循环+协程模式。这样设计结构和维护虽然相对于回调函数简单一些,但是代码还是有一些混乱,并且又当作生成器又当作协程,都是还是一些技巧性的东西,为了将语义变得更加明确,于是在python3.5使用了async和await(功能与yield from类似)关键词正式定义原生协程,asyncio是python解决异步io编程的一个完整框架。

它具有如下定义:

  1. 包含各种特定系统实现的模块化事件循环
  2. 传输与协议抽象
  3. 对TCP,UDP,SSL,子进程,延时调用以及其他的具体支持
  4. 模仿futures模块适用于事件循环使用到Future类
  5. 基于yield from的协议和任务,可以使用顺序执行的方式编写并发代码
  6. 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  7. 模仿threading模块中的同步语法,可以用在单线程内实现协程同步

协程编程离不开的三大要点:

  1. 事件循环
  2. 回调(驱动生成器)
  3. epoll/select(IO多路复用)

Asyncio是一个异步编程的框架,可以解决异步编程,协程调度问题,线程问题,是整个异步IO的解决方案。

事件循环

简单案例(访问一个网站)

async def get_url_title(url):
# 使用关键词async定义一个协程
    print('开始访问网站:{}'.format(url))
    await asyncio.sleep(2)
    # 这一步至关重要
    # asyncio.sleep(2) 功能:异步非阻塞等待2s,作用是模拟访问网站消耗的时间
    # await 的作用类似 yield,即这个时候把线程资源控制权交出去,监听这个描述符直到这个任务完成
    # await 后面只能接三种类型
    '''
    1. 协程:Python 协程属于 可等待 对象,因此可以在其他协程中被等待:
    2. 任务:任务 被用来设置日程以便 并发 执行协程。(当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务,该协程将自动排入日程准备立即运行)
    3. Future 对象:Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。(当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。)

    如果await time.sleep(2) 是会报错的
    '''
    print('网站访问成功')

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 一行代码创造事件循环
    loop.run_until_complete(get_url_title('http://www.langzi.fun'))
    # 这是一个阻塞的方法,可以理解成多线程中的join方法
    # 直到get_url_title('http://www.langzi.fun')完成后,才会继续执行下面的代码
    end_time = time.time()
    print('消耗时间:{}'.format(end_time-start_time))

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
消耗时间:2.0018768310546875

简单案例(访问多个网站)

协程的优势是多任务协作,单任务访问网站没法发挥出他的功能,一次性访问多个网站或者一次性等待多个IO响应时间才能发挥它的优势。

# -*- coding:utf-8 -*-
import asyncio
import time

async def get_url_title(url):
    print('开始访问网站:{}'.format(url))
    await asyncio.sleep(2)
    print('网站访问成功')

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 创造一个事件循环
    tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]
    # 这个列表代表总任务量,即执行10次get_url_title()函数
    loop.run_until_complete(asyncio.wait(tasks))
    # asyncio.wait后面接上非空可迭代对象,一般来说是功能函数列表
    # 功能是一次性提交多个任务,等待完成
    # loop.run_until_complete(asyncio.gather(*tasks))
    # 和上面代码功能一致,但是gather更加高级,如果是列表就需要加上*
    # 这里会等到全部的任务执行完后才会执行后面的代码
    end_time = time.time()
    print('消耗时间:{}'.format(end_time-start_time))

对一个网站发起10次请求,返回结果:

开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
消耗时间:2.0015649795532227

gather与wait的区别:

  • gather更擅长于将函数聚合在一起
  • wait更擅长筛选运行状况

即gather更加高级,他可以将任务分组,也可以取消任务

import asyncio

async def get_url_title(url):
    print('开始访问网站:{}'.format(url))
    await asyncio.sleep(2)
    print('网站访问成功')
    return 'success'

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 使用wait方法
    # tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]
    # loop.run_until_complete(asyncio.wait(tasks))

    # 使用gather方法实现分组导入(方法1)
    group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]
    group2 = [get_url_title('http://www.baidu.com')for i in range(5)]
    loop.run_until_complete(asyncio.gather(*group1,*group2))
    # 这种方法会把两个全部一次性导入

    # 使用gather方法实现分组导入(方法2)
    group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]
    group2 = [get_url_title('http://www.baidu.com')for i in range(5)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    #group2.cancel() 取消group2任务
    loop.run_until_complete(asyncio.gather(group1,group2))
    # 这种方法会先把group1导入,然后导入group2

返回结果:

开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.baidu.com
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功

另外一种使用gather获取返回结果:

import asyncio

async def get_url_title(url):
    print('开始访问网站:{}'.format(url))
    await asyncio.sleep(2)
    print('网站访问成功')
    return 'success'

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 使用gather方法传递任务获取结果
    group1 = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))
    loop.run_until_complete(asyncio.gather(group1))
    # 如果不是列表就不需要加*
    print(group1.result())

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
success

还有一些复杂的区别转移到python 异步协程中查看

协程的调用和组合十分灵活,尤其是对于结果的处理,如何返回,如何挂起,需要逐渐积累经验和前瞻的设计。

简单案例(获取返回值)

# -*- coding:utf-8 -*-
import asyncio
import time

async def get_url_title(url):
    print('开始访问网站:{}'.format(url))
    await asyncio.sleep(2)
    print('网站访问成功')
    return 'success'

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 创建一个事件循环

    get_future = loop.create_task(get_url_title('http://www.langzi.fun'))
    #get_future = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))
    # 这两行代码功能用法一模一样

    loop.run_until_complete(get_future)
    print('获取结果:{}'.format(get_future.result()))
    # 获取结果

    end_time = time.time()
    print('消耗时间:{}'.format(end_time-start_time))

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
获取结果:success
消耗时间:2.0019724369049072

如果是多个网址传入,访问多个网址的返回值呢?只需要把前面的知识点汇总一起即可使用:

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 创建一个事件循环

    tasks = [loop.create_task(get_url_title('http://www.langzi.fun')) for i in range(10)]
    # 把所有要返回的函数加载到一个列表

    loop.run_until_complete(asyncio.wait(tasks))
    # 这里和上面用法一样

    print('获取结果:{}'.format([x.result() for x in tasks]))
    # 因为结果都在一个列表,在列表中取值即可

    end_time = time.time()
    print('消耗时间:{}'.format(end_time-start_time))

返回结果:

开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
获取结果:['success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success']
消耗时间:2.0016491413116455

简单案例(回调函数)

上面的例子是一个协程函数,当这个协程函数的await xxx执行完毕后,想要执行另一个函数后,然后再返回这个协程函数的返回结果该这么做:

# -*- coding:utf-8 -*-
import asyncio
from functools import partial
# partial的功能是 固定函数参数,返回一个新的函数。你可以这么理解:
'''
from functools import partial
    def go(x,y):
        return x+y
    g = partial(go,y=2)
    print(g(1))
返回结果:3

    g = partial(go,x=5,y=2)
    print(g())
返回结果:7

'''
async def get_url_title(url):
    print('开始访问网站:{}'.format(url))
    await asyncio.sleep(2)
    print('网站访问成功')
    # 当这个协程函数快要结束返回值的时候,会调用下面的call_back函数
    # 等待call_back函数执行完毕后,才返回这个协程函数的值
    return 'success'

def call_back(future,url):
    # 注意这里必须要传递future参数,因为这里的future即代表下面的get_future对象
    print('检测网址:{}状态正常'.format(url))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 创建一个事件循环

    get_future = loop.create_task(get_url_title('http://www.langzi.fun'))
    # 将一个任务注册到loop事件循环中

    get_future.add_done_callback(partial(call_back,url = 'http://www.langzi.fun'))
    # 这里是设置,当上面的任务完成要返回结果的时候,执行call_back函数
    # 注意call_back函数不能加上(),也就意味着你只能依靠partial方法进行传递参数

    loop.run_until_complete(get_future)
    # 等待任务完成
    print('获取结果:{}'.format(get_future.result()))
    # 获取结果

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
检测网址:http://www.langzi.fun状态正常
获取结果:success

梳理

  1. 协程函数必须要使用关键词async定义
  2. 如果遇到了要等待的对象,必须要使用await
  3. 使用await后面的任务,必须是可等待对象(三种主要类型: 协程, 任务 和 Future.)
  4. 运行前,必须要创建一个事件循环(loop = asyncio.get_event_loop(),一行代码即可)
  5. 然后把任务加载到该事件循环中即可
  6. 如果需要获取协程函数的返回值,需要使用loop.create_task()或asyncio.ensure_future()函数,在最后使用.result()获取返回结果。
  7. 如果想要把多个任务注册到loop中,需要使用一个列表包含他们,调用的时候使用asyncio.wait(list)

取消协程任务

存在多个任务协程,想使用ctrl c退出协程,使用例子讲解:

import asyncio
async def get_time_sleep(t):
    print('开始运行,等待:{}s'.format(t))
    await asyncio.sleep(t)
    print('运行结束')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 创建一个事件循环
    task_1 = get_time_sleep(1)
    task_2 = get_time_sleep(2)
    task_3 = get_time_sleep(3)

    tasks = [task_1,task_2,task_3]
    # 三个协程任务加载到一个列表

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt:
        # 当检测到键盘输入 ctrl c的时候
        all_tasks = asyncio.Task.all_tasks()
        # 获取注册到loop下的所有task
        for task in all_tasks:
            print('开始取消协程')
            task.cancel()
            # 取消该协程,如果取消成功则返回True
        loop.stop()
        # 停止循环
        loop.run_forever()
        # loop事件循环一直运行
        # 这两步必须要做
    finally:
        loop.close()
        # 关闭事件循环

run_forever 会一直运行,直到 stop 被调用,但是你不能像下面这样调 stop

loop.run_forever()
loop.stop()

run_forever 不返回,stop 永远也不会被调用。所以,只能在协程中调 stop:

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')
    loop.stop()

这样并非没有问题,假如有多个协程在 loop 里运行:

asyncio.ensure_future(do_some_work(loop, 1))
asyncio.ensure_future(do_some_work(loop, 3))

loop.run_forever()

第二个协程没结束,loop 就停止了——被先结束的那个协程给停掉的。
要解决这个问题,可以用 gather 把多个协程合并成一个 future,并添加回调,然后在回调里再去停止 loop。

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

def done_callback(loop, futu):
    loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))
futus.add_done_callback(functools.partial(done_callback, loop))

loop.run_forever()

其实这基本上就是 run_until_complete 的实现了,run_until_complete 在内部也是调用 run_forever。

关于loop.close(),简单来说,loop 只要不关闭,就还可以再运行。

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

但是如果关闭了,就不能再运行了:

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))  # 此处异常

梳理

  1. 通过gather()启动的协程任务,是可以直接取消的,并且还能获取取消是否成功
  2. 可以通过 asyncio.Task.all_tasks()获取所有的协程任务
  3. 如果使用run_forever()的话会一直运行,只能通过loop.stop()停止

协程相互嵌套

import asyncio
async def sum_tion(x,y):
    print('开始执行传入参数相加:{} + {}'.format(x,y))
    await asyncio.sleep(1)
    # 模拟等待1S
    return (x+y)

async def print_sum(x,y):
    result = await sum_tion(x,y)
    print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    loop.run_until_complete(print_sum(1000,2000))

    loop.close()

返回结果:

开始执行传入参数相加:1000 + 2000
3000

执行流程:

  1. run_until_complete运行,会注册task(协程:print_sum)并开启事件循环
  2. print_sum协程中嵌套了子协程,此时print_sum协程暂停(类似委托生成器),转到子协程(协程:sum_tion)中运行代码,期间子协程需sleep1秒钟,直接将结果反馈到event loop中,即将控制权转回调用方,而中间的print_sum暂停不操作
  3. 1秒后,调用方将控制权给到子协程(调用方与子协程直接通信),子协程执行接下来的代码,直到再遇到wait(此实例没有)
  4. 最后执行到return语句,子协程向上级协程(print_sum抛出异常:StopIteration),同时将return返回的值返回给上级协程(print_sum中的result接收值),print_sum继续执行暂时时后续的代码,直到遇到return语句
  5. 向 event loop 抛出StopIteration异常,此时协程任务都已经执行完毕,事件循环执行完成(event loop :the loop is stopped),close事件循环。

如果想要获取协程嵌套函数返回的值,就必须使用回调:

import asyncio
async def sum_tion(x,y)->int:
    print('开始执行传入参数相加:{} + {}'.format(x,y))
    await asyncio.sleep(1)
    # 模拟等待1S
    return (x+y)

async def print_sum(x,y):
    result = await sum_tion(x,y)
    return result

def callback(future):
    return future.result()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    future = loop.create_task(print_sum(100,200))
    # 如果想要获取嵌套协程返回的值,就必须使用回调

    future.add_done_callback(callback)
    loop.run_until_complete(future)

    print(future.result())

    loop.close()

返回结果:

开始执行传入参数相加:100 + 200
300

定时启动任务

asyncio提供定时启动协程任务,通过call_soon,call_later,call_at实现,他们的区别如下:

call_soon

call_soon是立即执行

def callback(sleep_times):
    print("预计消耗时间 {} s".format(sleep_times))
def stoploop(loop):
    print('时间消耗完毕')
    loop.stop()


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 创建一个事件循环
    loop.call_soon(callback,5)
    # 立即启动callback函数
    loop.call_soon(stoploop,loop)
    # 上面执行完毕后,立即启动执行stoploop函数
    loop.run_forever()
    #要用这个run_forever运行,因为没有传入协程
    print('总共耗时:{}'.format(time.time()-start_time))

返回结果:

预计消耗时间 5 s
时间消耗完毕
总共耗时:0.0010013580322265625

call_later

call_later是设置一定时间启动执行

def callback(sleep_times):
    print("预计消耗时间 {} s".format(sleep_times))
def stoploop(loop):
    print('时间消耗完毕')
    loop.stop()


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()


    loop.call_later(1,callback,1.0)
    # 等待1秒后执行callback函数,传入参数是1.0
    loop.call_later(5,stoploop,loop)
    # 等待5秒后执行stoploop函数,传入参数是loop

    loop.run_forever()
    print('总共耗时:{}'.format(time.time()-start_time))

返回结果:

预计消耗时间 1.0 s
时间消耗完毕
总共耗时:5.002613544464111

call_at

call_at类似与call_later,但是他指定的时间不再是传统意义上的时间,而是loop的内部时钟时间,效果和call_later一样, call_later内部其实调用了call_later

import time
import asyncio

def callback(loop):
    print("传入loop.time()时间为: {} s".format(loop.time()))
def stoploop(loop):
    print('时间消耗完毕')
    loop.stop()


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()

    now = loop.time()
    # loop内部的时钟时间
    loop.call_at(now+1,callback,loop)
    # 等待loop内部时钟时间加上1s后,执行callba函数,传入参数为loop
    loop.call_at(now+3,callback,loop)
    # 等待loop内部时钟时间加上3s后,执行callba函数,传入参数为loop
    loop.call_at(now+5,stoploop,loop)
    # 等待loop内部时钟时间加上1s后,执行stoploop函数,传入参数为loop

返回结果:

传入loop.time()时间为: 3989.39 s
传入loop.time()时间为: 3991.39 s
时间消耗完毕
总共耗时:5.002060174942017

call_soon_threadsafe 线程安全的call_soon

call_soon_threadsafe用法和call_soon一致。但在涉及多线程时, 会使用它.

梳理

  1. call_soon直接启动
  2. call_later自己定时启动
  3. call_at根据loop.time()内部的时间,设置等待时间启动
  4. call_soon_threadsafe和call_soon方法一致,是保证线程安全的
  5. 他们都是比较底层的,在正常使用时很少用到。

结合线程池

Asyncio是异步IO编程的解决方案,异步IO是包括多线程,多进程,和协程的。所以asyncio是可以完成多线程多进程和协程的,在开头说到,协程是单线程的,如果遇到阻塞的话,会阻塞所有的代码任务,所以是不能加入阻塞IO的,但是比如requests库是阻塞的,socket如果不设置setblocking(false)的话,也是阻塞的,这个时候可以放到一个线程中去做也是可以解决的,即在协程中集成阻塞IO,就加入多线程一起解决问题。

用requests完成异步编程(使用线程池)

from concurrent.futures import ThreadPoolExecutor
import requests
import asyncio
import time
import re

def get_url_title(url):
    # 功能是获取网址的标题
    r = requests.get(url)
    try:
        title = re.search('<title>(.*?)</title>',r.content.decode(),re.S|re.I).group(1)
    except Exception as e:
        title = e
    print(title)

if __name__ == '__main__':
    start_time = time.time()

    loop = asyncio.get_event_loop()
    # 创建一个事件循环
    p = ThreadPoolExecutor(5)
    # 创建一个线程池,开启5个线程
    tasks = [loop.run_in_executor(p,get_url_title,'http://www.langzi.fun')for i in range(10)]
    # 这一步很重要,使用loop.run_in_executor()函数:内部接受的是阻塞的线程池,执行的函数,传入的参数
    # 即对网站访问10次,使用线程池访问
    loop.run_until_complete(asyncio.wait(tasks))
    # 等待所有的任务完成
    print(time.time()-start_time)

返回结果:

 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
5.589502334594727

访问10次消耗时间为5.5s,尝试将 p = ThreadPoolExecutor(10),线程数量设置成10个线程,消耗时间为4.6s,改用从进程池p = ProcessPoolExecutor(10),也是一样可以运行的,不过10个进程消耗时间也是5.5s,并且消耗更多的CPU资源。

### 用socket完成异步编程(使用线程池)

import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
import time
import re


def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = '/'

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, 80))
    client.send(
        "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break
    data = data.decode('utf8')
    html_data = data.split('\r\n\r\n')[1]
    # 把请求头信息去掉, 只要网页内容
    title = re.search('<title>(.*?)</title>',html_data,re.S|re.I).group(1)
    print(title)
    client.close()


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    p = ThreadPoolExecutor(3)  # 线程池 放3个线程
    tasks = [loop.run_in_executor(p,get_url,'http://www.langzi.fun') for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    print('last time:{}'.format(time.time() - start_time))

返回结果:

 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
last time:5.132313966751099

使用socket完成http请求(未使用线程池)

import asyncio
from urllib.parse import urlparse
import time


async def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = '/'

    # 建立socket连接
    reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接
    writer.write(
        "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    all_lines = []
    async for raw_line in reader:  # __aiter__ __anext__魔法方法
        line = raw_line.decode('utf8')
        all_lines.append(line)
    html = '\n'.join(all_lines)
    return html


async def main():
    tasks = []
    tasks = [asyncio.ensure_future(get_url('http://www.langzi.fun')) for i in range(10)]
    for task in asyncio.as_completed(tasks):  # 完成一个 print一个
        result = await task
        print(result)

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print('last time:{}'.format(time.time() - start_time))

asyncio协程和之前讲解的select事件循环原理是一样的

梳理

  1. 协程中遇到必须要使用阻塞任务的时候,可以把阻塞代码放到线程池中运行
  2. 线程池中的代码放到loop.run_in_executor()里面,并且所有任务保存到列表
  3. 最后通过loop.run_until_complate(asyncio.wait(任务列表))中运行
  4. asyncio能通过socket实现与服务端建立连接

与多进程的结合

既然异步协程和多进程对网络请求都有提升,那么为什么不把二者结合起来呢?在最新的 PyCon 2018 上,来自 Facebook 的 John Reese 介绍了 asyncio 和 multiprocessing 各自的特点,并开发了一个新的库,叫做 aiomultiprocess

这个库的安装方式是:

pip3 install aiomultiprocess

需要 Python 3.6 及更高版本才可使用。

使用这个库,我们可以将上面的例子改写如下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls)
        return result

coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

end = time.time()
print('Cost time:', end - start)

这样就会同时使用多进程和异步协程进行请求,但在真实情况下,我们在做爬取的时候遇到的情况千变万化,一方面我们使用异步协程来防止阻塞,另一方面我们使用 multiprocessing 来利用多核成倍加速,节省时间其实还是非常可观的。

同步与通信

和多线程多进程任务一样,协程也可以实现和需要进行同步与通信。

简单例子(顺序启动多任务)

协程是单线程的,他的执行依赖于事件循环中最后的loop.run_until_complate()

import asyncio

num = 0

async def add():
    global num
    for i in range(10):
        await asyncio.sleep(0.1)
        num += i
async def desc():
    global num
    for i in range(10):
        await asyncio.sleep(0.2)
        num -= i

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [add(),desc()]
    loop.run_until_complete(asyncio.wait(tasks))
    # 这里执行顺序是先执行add函数,然后执行desc函数
    # 所以最后的结果是0
    loop.close()
    print(num)

返回结果:

0

这里使用一个共有变量,协程下不需要加锁。

简单例子(Lock(锁))

# -*- coding:utf-8 -*-
import asyncio
import functools


def unlock(lock):
    print('线程锁释放成功')
    lock.release()


async def test(locker, lock):
    print(f'{locker} 等待线程锁释放')
    # ---------------------------------
    # with await lock:
    #     print(f'{locker} 线程锁上锁')
    # ---------------------------------
    # 上面这两行代码等同于:
    # ---------------------------------
    # await lock.acquire()
    # print(f'{locker} 线程锁上锁')
    # lock.release()
    # ---------------------------------
    await lock.acquire()
    print(f'{locker} 线程锁上锁')
    lock.release()
    print(f'{locker} 线程锁释放')


async def main(loop):
    lock = asyncio.Lock()
    await lock.acquire()
    loop.call_later(0.5, functools.partial(unlock, lock))
    # call_later() 表达推迟一段时间的回调, 第一个参数是以秒为单位的延迟, 第二个参数是回调函数
    await asyncio.wait([test('任务 1 ', lock), test('任务 2', lock)])


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

返回结果:

任务 1  等待线程锁释放
任务 2 等待线程锁释放
线程锁释放成功
任务 1  线程锁上锁
任务 1  线程锁释放
任务 2 线程锁上锁
任务 2 线程锁释放

简单例子(Semaphore(信号量))

可以使用 Semaphore(信号量) 来控制并发访问的数量:

import asyncio
from aiohttp import ClientSession


async def fetch(sem,url):
    async with sem:
        # 最大访问数
        async with ClientSession() as session:
            async with session.get(url) as response:
                    status = response.status
                    res = await response.text()
                    print("{}:{} ".format(response.url, status))
                    return res

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    url = "http://www.langzi.fun"
    sem = asyncio.Semaphore(1000)
    # 设置最大并发数为1000
    tasks = [loop.create_task(fetch(sem,url))for i in range(100)]
    # 对网站访问100次
    loop.run_until_complete(asyncio.wait(tasks))

简单例子(Condition(条件))

import asyncio


async def consumer(cond, name, second):
    # 消费者函数
    await asyncio.sleep(second)
    # 等待延迟
    with await cond:
        await cond.wait()
        print('{}: 得到响应'.format(name))


async def producer(cond):
    await asyncio.sleep(2)
    for n in range(1, 3):
        with await cond:
            print('生产者 {} 号'.format(n))
            cond.notify(n=n) # 挨个通知单个消费者
        await asyncio.sleep(0.1)


async def producer2(cond):
    await asyncio.sleep(2)
    with await cond:
        print('释放信号量,通知所有消费者')
        cond.notify_all()
        # 一次性通知全部的消费者


async def main(loop):
    condition = asyncio.Condition()
    # 设置信号量
    task = loop.create_task(producer(condition))
    # producer 和 producer2 是两个协程, 不能使用 call_later(), 需要用到 create_task() 把它们创建成一个 task
    consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]
    await asyncio.wait(consumers)
    task.cancel()
    print('---分割线---')
    task = loop.create_task(producer2(condition))
    consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]
    await asyncio.wait(consumers)
    task.cancel()
    # 取消任务


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

返回结果:

生产者 1 号
c1: 得到响应
生产者 2 号
c2: 得到响应
---分割线---
释放信号量,通知所有消费者
c1: 得到响应
c2: 得到响应

简单例子(Event(事件))

与 Lock(锁) 不同的是, 事件被触发的时候, 两个消费者不用获取锁, 就要尽快地执行下去了

import asyncio
import functools


def set_event(event):
    print('开始设置事件')
    event.set()


async def test(name, event):
    print('{} 的事件未设置'.format(name))
    await event.wait()
    print('{} 的事件已设置'.format(name))


async def main(loop):
    event = asyncio.Event()
    # 声明事件
    print('事件是否设置: {}'.format(event.is_set()))
    loop.call_later(0.1, functools.partial(set_event, event))
    # 在0.1s后执行set_event()函数,对事件进行设置
    await asyncio.wait([test('e1', event), test('e2', event)])
    print('最终事件状态: {}'.format(event.is_set()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

返回结果:

事件是否设置: False
e1 的事件未设置
e2 的事件未设置
开始设置事件
e1 的事件已设置
e2 的事件已设置
最终事件状态: True

简单例子(协程间通信)

协程是单线程,因此使用list、dict就可以实现通信,而不会有线程安全问题,当然可以使用asyncio.Queue

from asyncio import Queue
queue = Queue(maxsize=3)   
# queue的put和get需要用await

举个例子:

import asyncio
from asyncio import Queue
import random
import string
q = Queue(maxsize=100)

async def add():
    while 1:
        await q.put(random.choice(string.ascii_letters))

async def desc():
    while 1:
        res = await q.get()
        print(res)
        await asyncio.sleep(1)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([add(),desc()]))
    loop.run_forever()

返回结果:

D
b
S
x
...

加速asyncio

uvloop,这个使用库可以有效的加速asyncio,本库基于libuv,也就是nodejs用的那个库。使用它也非常方便,不过目前不支持windows

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

没错就是2行代码,就可以提速asyncio。

tokio同样可以做异步资源循环

import tokio
asyncio.set_event_loop_policy(tokio.EventLoopPolicy())

Aiohttp

aiohttp是异步非阻塞的http请求库,结合协程一起才能在web请求发挥出极大的优势。

aiohttp基础用法

aiohttp分为服务器端和客户端,本文只介绍客户端。

案例:

import aiohttp
async def job(session):
    response = await session.get(URL)       # 等待并切换
    return str(response.url)


async def main(loop):
    async with aiohttp.ClientSession() as session:      # 官网推荐建立 Session 的形式
        tasks = [loop.create_task(job(session)) for _ in range(2)]
        finished, unfinished = await asyncio.wait(tasks)
        all_results = [r.result() for r in finished]    # 获取所有结果
        print(all_results)

t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
print("Async total time:", time.time() - t1)

"""
['https://morvanzhou.github.io/', 'https://morvanzhou.github.io/']
Async total time: 0.11447715759277344
"""

我们刚刚创建了一个 Session, 这是官网推荐的方式, 但是我觉得也可以直接用 request 形式, 细节请参考官方说明. 如果要获取网页返回的结果, 我们可以在 job() 中 return 个结果出来, 然后再在 finished, unfinished = await asyncio.wait(tasks) 收集完成的结果, 这里它会返回完成的和没完成的, 我们关心的都是完成的, 而且 await 也确实是等待都完成了才返回. 真正的结果被存放在了 result() 里面.

aiohttp安装

pip3 install aiohttp

基本请求用法

async with aiohttp.get('https://github.com') as r:
        await r.text()

其中r.text(), 可以在括号中指定解码方式,编码方式,例如

await resp.text(encoding='windows-1251')

或者也可以选择不编码,适合读取图像等,是无法编码的

await resp.read()

发起一个session请求

首先是导入aiohttp模块:

import aiohttp

然后我们试着获取一个web源码,这里以GitHub的公共Time-line页面为例:

async with aiohttp.ClientSession() as session:
    async with session.get('https://api.github.com/events') as resp:
        print(resp.status)
        print(await resp.text())

上面的代码中,我们创建了一个 ClientSession 对象命名为session,然后通过session的get方法得到一个 ClientResponse 对象,命名为resp,get方法中传入了一个必须的参数url,就是要获得源码的http url。至此便通过协程完成了一个异步IO的get请求。
有get请求当然有post请求,并且post请求也是一个协程:

session.post('http://httpbin.org/post', data=b'data')

用法和get是一样的,区别是post需要一个额外的参数data,即是需要post的数据。
除了get和post请求外,其他http的操作方法也是一样的:

session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

小记:
不要为每次的连接都创建一次session,一般情况下只需要创建一个session,然后使用这个session执行所有的请求。

每个session对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。

在URL中传递参数

我们经常需要通过 get 在url中传递一些参数,参数将会作为url问号后面的一部分发给服务器。在aiohttp的请求中,允许以dict的形式来表示问号后的参数。举个例子,如果你想传递 key1=value1 key2=value2 到 httpbin.org/get 你可以使用下面的代码:

params = {'key1': 'value1', 'key2': 'value2'}
async with session.get('http://httpbin.org/get',
                       params=params) as resp:
                       assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

可以看到,代码正确的执行了,说明参数被正确的传递了进去。不管是一个参数两个参数,还是更多的参数,都可以通过这种方式来传递。除了这种方式之外,还有另外一个,使用一个 list 来传递(这种方式可以传递一些特殊的参数,例如下面两个key是相等的也可以正确传递):

params = [('key', 'value1'), ('key', 'value2')]
async with session.get('http://httpbin.org/get',
                       params=params) as r:
    assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面两种,我们也可以直接通过传递字符串作为参数来传递,但是需要注意,通过字符串传递的特殊字符不会被编码:

async with session.get('http://httpbin.org/get',
                       params='key=value+1') as r:
        assert r.url == 'http://httpbin.org/get?key=value+1'

响应的内容

还是以GitHub的公共Time-line页面为例,我们可以获得页面响应的内容:

async with session.get('https://api.github.com/events') as resp:
    print(await resp.text())

运行之后,会打印出类似于如下的内容:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

resp的text方法,会自动将服务器端返回的内容进行解码–decode,当然我们也可以自定义编码方式:

await resp.text(encoding='gb2312')

除了text方法可以返回解码后的内容外,我们也可以得到类型是字节的内容:

print(await resp.read())

运行的结果是:

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

gzip和deflate转换编码已经为你自动解码。

小记:

text(),read()方法是把整个响应体读入内存,如果你是获取大量的数据,请考虑使用”字节流“(streaming response)

特殊响应内容:json

如果我们获取的页面的响应内容是json,aiohttp内置了更好的方法来处理json:

async with session.get('https://api.github.com/events') as resp:
    print(await resp.json())

如果因为某种原因而导致resp.json()解析json失败,例如返回不是json字符串等等,那么resp.json()将抛出一个错误,也可以给json()方法指定一个解码方式:

print(await resp.json(
encoding='gb2312')) 

或者传递一个函数进去:

print(await resp.json( lambda(x:x.replace('a','b')) ))

以字节流的方式读取响应内容

虽然json(),text(),read()很方便的能把响应的数据读入到内存,但是我们仍然应该谨慎的使用它们,因为它们是把整个的响应体全部读入了内存。即使你只是想下载几个字节大小的文件,但这些方法却将在内存中加载所有的数据。所以我们可以通过控制字节数来控制读入内存的响应内容:

async with session.get('https://api.github.com/events') as resp:
    await resp.content.read(10) #读取前10个字节

一般地,我们应该使用以下的模式来把读取的字节流保存到文件中:

with open(filename, 'wb') as fd:
    while True:
        chunk = await resp.content.read(chunk_size)
        if not chunk:
            break
        fd.write(chunk)

自定义请求头

如果你想添加请求头,可以像get添加参数那样以dict的形式,作为get或者post的参数进行请求:

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
headers = {'content-type': 'application/json'}

await session.post(url,
                   data=json.dumps(payload),
                   headers=headers)

自定义Cookie

给服务器发送cookie,可以通过给 ClientSession 传递一个cookie参数:

url = 'http://httpbin.org/cookies'
cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:
    async with session.get(url) as resp:
        assert await resp.json() == {
           "cookies": {"cookies_are": "working"}}

可直接访问链接 “httpbin.org/cookies”查看当前cookie,访问session中的cookie请见第10节。

忽略SSL证书

在requests中,通过设置verify=False来忽略,在aiohttp中,这么设置即可:

async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
    async session.get(url) as resp:
        pass

禁止跳转也是在相同的位置设置

allow_redirects = False

SSL加密请求

有的请求需要验证加密证书,可以设置ssl=False,取消验证

r = await session.get('https://example.com', ssl=False)

加入证书

sslcontext = ssl.create_default_context(
   cafile='/path/to/ca-bundle.crt')
r = await session.get('https://example.com', ssl=sslcontext)

限制同时请求数量

imit默认是100,limit=0的时候是无限制

conn = aiohttp.TCPConnector(limit=30)

post数据的几种方式

(1)模拟表单post数据

payload = {'key1': 'value1', 'key2': 'value2'}
async with session.post('http://httpbin.org/post',data=payload) as resp:
    print(await resp.text())

注意:data=dict的方式post的数据将被转码,和form提交数据是一样的作用,如果你不想被转码,可以直接以字符串的形式 data=str 提交,这样就不会被转码。

(2)post json

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}

async with session.post(url, data=json.dumps(payload)) as resp:
    ...

其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串可以被识别为json格式

(3)post 小文件

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)
可以设置好文件名和content-type:
url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file',
               open('report.xls', 'rb'),
               filename='report.xls',
               content_type='application/vnd.ms-excel')

await session.post(url, data=data)

如果将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。

(4)post 大文件

aiohttp支持多种类型的文件以流媒体的形式上传,所以我们可以在文件未读入内存的情况下发送大文件。

@aiohttp.streamer
def file_sender(writer, file_name=None):
    with open(file_name, 'rb') as f:
        chunk = f.read(2**16)
        while chunk:
            yield from writer.write(chunk)
            chunk = f.read(2**16)

# Then you can use `file_sender` as a data provider:

async with session.post('http://httpbin.org/post',data=file_sender(file_name='huge_file')) as resp:
    print(await resp.text())

同时我们可以从一个url获取文件后,直接post给另一个url,并计算hash值:

async def feed_stream(resp, stream):
    h = hashlib.sha256()

    while True:
        chunk = await resp.content.readany()
        if not chunk:
            break
        h.update(chunk)
        stream.feed_data(chunk)

    return h.hexdigest()

resp = session.get('http://httpbin.org/post')
stream = StreamReader()
loop.create_task(session.post('http://httpbin.org/post', data=stream))
file_hash = await feed_stream(resp, stream)

因为响应内容类型是StreamReader,所以可以把get和post连接起来,同时进行post和get:

r = await session.get('http://python.org')
await session.post('http://httpbin.org/post',data=r.content)

(5)post预压缩数据

在通过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(通常是deflate 或 zlib)作为content-encoding的值:

async def my_coroutine(session, headers, my_data):
    data = zlib.compress(my_data)
    headers = {'Content-Encoding': 'deflate'}
    async with session.post('http://httpbin.org/post',
                            data=data,
                            headers=headers)
        pass

keep-alive, 连接池,共享cookie

ClientSession 用于在多个连接之间共享cookie:

async with aiohttp.ClientSession() as session:
    await session.get(
        'http://httpbin.org/cookies/set?my_cookie=my_value')
    filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
    assert filtered['my_cookie'].value == 'my_value'
    async with session.get('http://httpbin.org/cookies') as r:
        json_body = await r.json()
        assert json_body['cookies']['my_cookie'] == 'my_value'

也可以为所有的连接设置共同的请求头:

async with aiohttp.ClientSession(
    headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session:
    async with session.get("http://httpbin.org/headers") as r:
        json_body = await r.json()
        assert json_body['headers']['Authorization'] == \
            'Basic bG9naW46cGFzcw=='

ClientSession 还支持 keep-alive连接和连接池(connection pooling)

cookie安全性

默认ClientSession使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip地址产生的cookie,只能接受 DNS 解析IP产生的cookie。可以通过设置aiohttp.CookieJar 的 unsafe=True 来配置:

jar = aiohttp.CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)

控制同时连接的数量(连接池)

也可以理解为同时请求的数量,为了限制同时打开的连接数量,我们可以将限制参数传递给连接器:

conn = aiohttp.TCPConnector(limit=30)#同时最大进行连接的连接数为30,默认是100,limit=0的时候是无限制

限制同时打开限制同时打开连接到同一端点的数量((host, port, is_ssl) 三的倍数),可以通过设置 limit_per_host 参数:

conn = aiohttp.TCPConnector(limit_per_host=30)#默认是0

自定义域名解析

我们可以指定域名服务器的 IP 对我们提供的get或post的url进行解析:

from aiohttp.resolver import AsyncResolver
resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
conn = aiohttp.TCPConnector(resolver=resolver)

设置代理

aiohttp支持使用代理来访问网页:

async with aiohttp.ClientSession() as session:
    async with session.get("http://python.org",
                           proxy="http://some.proxy.com") as resp:
        print(resp.status)

当然也支持需要授权的页面:

async with aiohttp.ClientSession() as session:
    proxy_auth = aiohttp.BasicAuth('user', 'pass')
    async with session.get("http://python.org",proxy="http://some.proxy.com",proxy_auth=proxy_auth) as resp:
        print(resp.status)

或者通过这种方式来验证授权:

session.get("http://python.org",proxy="http://user:pass@some.proxy.com")

响应状态码 response status code

可以通过 resp.status来检查状态码是不是200:

async with session.get('http://httpbin.org/get') as resp:
    assert resp.status == 200

响应头

我们可以直接使用 resp.headers 来查看响应头,得到的值类型是一个dict:

>>> resp.headers
{'ACCESS-CONTROL-ALLOW-ORIGIN': '*',
 'CONTENT-TYPE': 'application/json',
 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT',
 'SERVER': 'gunicorn/18.0',
 'CONTENT-LENGTH': '331',
 'CONNECTION': 'keep-alive'}

或者我们可以查看原生的响应头:

>>> resp.raw_headers
((b'SERVER', b'nginx'),
 (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'),
 (b'CONTENT-TYPE', b'text/html; charset=utf-8'),
 (b'CONTENT-LENGTH', b'12150'),
 (b'CONNECTION', b'keep-alive'))

重定向的响应头

如果一个请求被重定向了,我们依然可以查看被重定向之前的响应头信息:

>>> resp = await session.get('http://example.com/some/redirect/')
>>> resp
<ClientResponse(http://example.com/some/other/url/) [200]>
>>> resp.history
(<ClientResponse(http://example.com/some/redirect/) [301]>,)

超时处理

默认的IO操作都有5分钟的响应时间 我们可以通过 timeout 进行重写:

async with session.get('https://github.com', timeout=60) as r:
    ...

其他优秀的AIO库

aio_mysql

支持mysql异步连接

依赖

  1. python3.4+
  2. mysql环境
  3. asyncio
  4. aiomysql

基础用法

import asyncio
from aiomysql import create_pool
# 导入创建数据库连接
loop = asyncio.get_event_loop()
# 创建一个事件循环

async def go():
    async with create_pool(host='127.0.0.1', port=3306,user='root',password='root',db='mysql', loop=loop) as pool:
    # 请求连接到数据库
        async with pool.get() as conn:
        # 获取到数据库连接的游标
            async with conn.cursor() as cur:
            # 操作sql数据库
                await cur.execute("show databases;")
                # 获取结果事件,此时用await等待
                value = await cur.fetchall()
                # 等待获取结果
                print(value)
loop.run_until_complete(go())

返回结果:

(('information_schema',), ('challenges',), ('mysql',), ('nikes',), ('performance_schema',), ('security',), ('test',), ('url',), ('yolanda_information_collection_099',))

连接池

import asyncio
import aiomysql

async def test_example(loop):
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,user='root', password='root',db='mysql', loop=loop)
    # 定义创建一个mysql连接池
    async with pool.acquire() as conn:
    # 每次请求操作sql数据库的时候都要上锁,保证线程安全
        async with conn.cursor() as cur:
        # 操作sql数据库
            await cur.execute("show databases;")
            # 等待获取哦结果
            print(cur.description)
            r = await cur.fetchone()
            # 等待获取结果
            print (r)
    pool.close()
    await pool.wait_closed()

loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))

返回结果:

(('Database', 253, None, 256, 256, 0, False),)
('information_schema',)

aioredis

支持redis异步连接

依赖

  1. python3.5+
  2. hiredis
  3. redis数据库环境

基础用法

import asyncio
import aioredis

loop = asyncio.get_event_loop()

async def go():
    conn = await aioredis.create_connection('redis://localhost', loop=loop)
    # 等待连接redis数据库
    await conn.execute('set', 'my-key', 'value')
    # 等待执行查询语句
    val = await conn.execute('get', 'my-key')
    # 等待获取结果
    print(val)
    conn.close()
    # 关闭连接
    await conn.wait_closed()
    # 必须要等待关闭数据库连接
loop.run_until_complete(go())

当然还有另一种写法也能达到同样的效果:

import asyncio
import aioredis

loop = asyncio.get_event_loop()

async def go():
    redis = await aioredis.create_redis(
        'redis://localhost', loop=loop)
    await redis.set('my-key', 'value')
    val = await redis.get('my-key')
    print(val)
    redis.close()
    await redis.wait_closed()
loop.run_until_complete(go())

连接池

import asyncio
import aioredis

loop = asyncio.get_event_loop()

async def go():
    pool = await aioredis.create_pool(
        'redis://localhost',
        minsize=5, maxsize=10,
        loop=loop)
    # 创建连接池,设置最大和最小连接数
    await pool.execute('set', 'my-key', 'value')
    # 等待执行查询语句
    print(await pool.execute('get', 'my-key'))
    pool.close()
    # 关闭连接池
    await pool.wait_closed()
    # 别忘了等待让所有连接池关闭

loop.run_until_complete(go())

当然连接池还有另一种写法,功能一致:

import asyncio
import aioredis

loop = asyncio.get_event_loop()

async def go():
    redis = await aioredis.create_redis_pool(
        'redis://localhost',
        minsize=5, maxsize=10,
        loop=loop)
    await redis.set('my-key', 'value')
    val = await redis.get('my-key')
    print(val)
    redis.close()
    await redis.wait_closed()
loop.run_until_complete(go())

aioredis 官方文档

aiomultiprocess

上文提起过,基于多进程与协程结合,可以发挥出多核CPU的优势,还有一个aiomultiprocessing的库,功能差不多,但是封装的不是太好,优化也没aiomultiprocess这个好。

依赖

  1. python3.6+
  2. asyncio
  3. aiomultiprocess

基础用法

import asyncio
import aiomultiprocess
import aiohttp

async def get_url_info(url):
    # 这一部分就是发起网络请求,没有aiomultiprocess的参与
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f'{resp.url}:{resp.status}')

async def main():
    # main()函数是核心函数,负责给每个CPU提供任务
    p = aiomultiprocess.Process(target=get_url_info,args=('http://www.langzi.fun',))
    # 传入参数
    await p
    # p是消耗事件的操作,需要使用await
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

返回结果:

http://www.langzi.fun:200

如果是传入一个网址列表的话:

async def main():
    # main()函数是核心函数,负责给每个CPU提供任务
    tasks = ['http://www.langzi.fun' for i in range(10)]
    # tasks列表有10个网址
    for url in tasks:
        p = aiomultiprocess.Process(target=get_url_info,args=(url,))
    # 传入参数
        await p
    # p是消耗事件的操作,需要使用await
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

获取返回结果

# -*- coding:utf-8 -*-
import asyncio
import aiomultiprocess
import aiohttp

async def get_url_info(url):
    # 这一部分就是发起网络请求,没有aiomultiprocess的参与
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()
            #print(f'{resp.url}:{resp.status}')

async def main():
    # main()函数是核心函数,负责给每个CPU提供任务
    p = aiomultiprocess.Worker(target=get_url_info,args=('http://www.langzi.fun',))
    # 传入参数,这里用Worker
    res = await p
    print(res)
    # p是消耗事件的操作,需要使用await
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

返回结果:

<!DOCTYPE html>
<html class="theme-next muse use-motion" lang="zh-Hans">
<head><meta name="generator" content="Hexo 3.8.0">
  <meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" c
....

线程池管理

# -*- coding:utf-8 -*-
import asyncio
import aiomultiprocess
import aiohttp

async def get_url_info(url):
    # 这一部分就是发起网络请求,没有aiomultiprocess的参与
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.read()

async def main():
    # main()函数是核心函数,负责给每个CPU提供任务
    tasks = ['http://www.langzi.fun' for i in range(10)]
    # 10个网址保存到列表

    async with aiomultiprocess.Pool() as pool:
        # 开启进程池
        result = await pool.map(get_url_info,tasks)
        # 这里必须要await,使用pool.map()方法
    print(result)
    # 返回的结果是一个列表

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

返回结果:

[b'<!DOCTYPE html>\n\n\n\n  \n\n\n<html class="theme-next muse use.......

对传参控制

对传递的参数一个是列表,一个是单个的字符串,如何处理?

比如下面的代码:

async def get_image(url:'http://test.com/',dirs:'[a.jpg,b.jpg,c.jpg]'):
    async with aiohttp.ClientSession() as session:
        for dir in dirs:
            async with session.get(url=url+dir) as resp:
                if resp.status == 200:
                    return await resp.content()
                    # only need one exists image url
async def main(url,dirs):
    async with aiomultiprocess.Pool() as pool:
        result = await pool.map(get_image,??,??)
        # hao to set there?
    print(result)

if __name__ == '__main__':
    urls = ['http://1.com/','http://2.com/','http://3.com/','http://4.com/',]
    dirs = ['a.jpg','b.jpg','c.jpg']
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(urls,dirs))

我想到办法是先申明dirs在最前面,然后作为全局变量不传递dirs。

解决方法是:

dirs = ['a.jpg', 'b.jpg', 'c.jpg']
async def get_image(url:'http://test.com/'):
    async with aiohttp.ClientSession() as session:
        for dir in dirs:
            print(url+dir)
            async with session.get(url=url+dir,timeout=3) as resp:
                if resp.status == 200:
                    return await resp.content()
                    # only need one exists image url

async def main():
    urls = urls = ['http://1.com/','http://2.com/','http://3.com/','http://4.com/']
    async with aiomultiprocess.Pool() as pool:
        result = await pool.map(get_image,urls)
    print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

该库的作者jreese 告诉我可以使用pool.starmap()的方法来处理参数:

async def get_image(url, dirs):
    ...

async def main():
    urls = [...]
    dirs = [...]
    async with Pool() as pool:
        result = await pool.starmap(get_image, ((url, dirs) for url in urls))

链接

aiofiles

普通的本地文件IO是阻塞的,不能轻易地和可移植地使之成为异步的。这意味着执行文件IO可能会干扰异步IO应用程序,而异步应用程序不应阻塞执行线程。aiofiles通过引入支持将操作委托给单独线程池的文件的异步版本来帮助实现这一点。

基础用法

import asyncio
import aiofiles

async def read_data():
    async with aiofiles.open('example.txt',mode='r',encoding='utf-8')as f:
    # 打开文件,等待读取
        content = await f.readlines()
        # 等待读取结果
        print(content)
loop = asyncio.get_event_loop()
loop.run_until_complete(read_data())

当前目录下有example.txt文件

具有如下方法:

close:关闭
flush:刷新
read:读取文件所有
readline:读取第一行,结果保存到列表
readlines:读取所有行,结果保存到列表
write:写入
writelines:接收参数是一个可迭代对象,写入

Aiodns

Aiodns是在dnspython后支持异步的dns库,一般可用于网址服务器搭建与子域名爆破(通过dns服务器获取数据结果)

本来想通过asyncio+aiomultiprocess+aiodns开发一款异步协程的子域名爆破工具,但是发现已经由蘑菇街的安全开发大佬写出来了,这里我就不重复造轮子了。基于异步协程的子域名爆破工具链接

这里是大佬的博客介绍地址

我拜读完代码后,发现大佬不仅基于字典做优化,还拓展了基于多种搜索引擎(百度,google,bing)和资产搜索引擎(zoomeye,shadon,fofa)的功能,使用uvloop加速,但是这样就只能在*inux系统下运行,如果想要在windows下运行,删除那两行vuloop的代码即可。

这个子域名爆破工具个人认为是目前最好用的。

基础用法

import asyncio
import aiodns

loop = asyncio.get_event_loop()
resolver = aiodns.DNSResolver(loop=loop)
f = resolver.query('google.com','A')
result = loop.run_until_complete(f)
print(result)

AsyncSSH

Python3.4+AsyncIO框架之上提供了sshv2协议的异步客户机和服务器实现。

依赖

  1. python3.4+
  2. cryptography (PyCA) 2.6.1+

基础用法

import asyncio, asyncssh, sys

async def run_client():
    async with asyncssh.connect('localhost') as conn:
        result = await conn.run('echo "Hello!"', check=True)
        print(result.stdout, end='')

try:
    asyncio.get_event_loop().run_until_complete(run_client())
except (OSError, asyncssh.Error) as exc:
    sys.exit('SSH connection failed: ' + str(exc))

janus

一个基于异步的线程安全消息队列,混合同步异步队列,应该用于经典同步(线程)代码和异步(异步)代码之间的通信。提供两个接口:同步和异步接口。同步与标准队列完全兼容,异步队列遵循异步队列设计。

基础用法

# -*- coding:utf-8 -*-
import asyncio
import janus
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
# queue队列可以设置异步队列和同步队列
# queue.async_q:异步
# queue.sync_q:同步

def threaded(sync_q):
    # 这个函数接受的是同步的队列
    for i in range(10):
        sync_q.put(i)
        # 队列放入数据
    sync_q.join()
    # 等待所有数据存放完毕


async def async_coro(async_q):
    for i in range(10):
        val = await async_q.get()
        print(val)
        # assert val == i
        # 等同于 if val == i:
        if val == i:
            async_q.task_done()
            # 如果获取到了结尾的数字,就关闭这个消息队列


fut = loop.run_in_executor(None, threaded, queue.sync_q)
# 该方法是把阻塞的队列注册加载到loop中,等待完成
loop.run_until_complete(async_coro(queue.async_q))
# 执行async_coro()函数,传入的参数是 queue.async_q (queue队列的异步队列)
loop.run_until_complete(fut)
# 这行代码不要也行

返回结果:

0
1
2
3
4
5
6
7
8
9

aioelasticsearch

支持Elasticsearch的异步库

基础用法

import asyncio

from aioelasticsearch import Elasticsearch

async def go():
    es = Elasticsearch()

    print(await es.search())

    await es.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()

异步滚动

import asyncio

from aioelasticsearch import Elasticsearch
from aioelasticsearch.helpers import Scan

async def go():
    async with Elasticsearch() as es:
        async with Scan(
            es,
            index='index',
            doc_type='doc_type',
            query={},
        ) as scan:
            print(scan.total)

            async for doc in scan:
                print(doc['_source'])

loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()

asyncio 异步

三种异步速度对比

Python Asyncio 资源列表 1

Python Asyncio 资源列表 2

坚持原创技术分享,您的支持将鼓励我继续创作!
------ 本文结束 ------

版权声明

LangZi_Blog's by Jy Xie is licensed under a Creative Commons BY-NC-ND 4.0 International License
由浪子LangZi创作并维护的Langzi_Blog's博客采用创作共用保留署名-非商业-禁止演绎4.0国际许可证
本文首发于Langzi_Blog's 博客( http://langzi.fun ),版权所有,侵权必究。

0%