阿里妹导读:作为在日常开发生产中非常实用的语言,有必要掌握一些python用法,比如爬虫、网络请求等场景,很是实用。但python是单线程的,如何提高python的处理速度,是一个很重要的问题,这个问题的一个关键技术,叫协程。本篇文章,讲讲python协程的理解与使用,主要是针对网络请求这个模块做一个梳理,希望能帮到有需要的同学。
在理解协程这个概念及其作用场景前,先要了解几个基本的关于操作系统的概念,主要是进程、线程、同步、异步、阻塞、非阻塞,了解这几个概念,不仅是对协程这个场景,诸如消息队列、缓存等,都有一定的帮助。接下来,编者就自己的理解和网上查询的材料,做一个总结。在面试的时候,我们都会记住一个概念,进程是系统资源分配的最小单位。是的,系统由一个个程序,也就是进程组成的,一般情况下,分为文本区域、数据区域和堆栈区域。文本区域存储处理器执行的代码(机器码),通常来说,这是一个只读区域,防止运行的程序被意外修改。数据区域存储所有的变量和动态分配的内存,又细分为初始化的数据区(所有初始化的全局、静态、常量,以及外部变量)和为初始化的数据区(初始化为0的全局变量和静态变量),初始化的变量最初保存在文本区,程序启动后被拷贝到初始化的数据区。堆栈区域存储着活动过程调用的指令和本地变量,在地址空间里,栈区紧连着堆区,他们的增长方向相反,内存是线性的,所以我们代码放在低地址的地方,由低向高增长,栈区大小不可预测,随开随用,因此放在高地址的地方,由高向低增长。当堆和栈指针重合的时候,意味着内存耗尽,造成内存溢出。进程的创建和销毁都是相对于系统资源,非常消耗资源,是一种比较昂贵的操作。进程为了自身能得到运行,必须要抢占式的争夺CPU。对于单核CPU来说,在同一时间只能执行一个进程的代码,所以在单核CPU上实现多进程,是通过CPU快速的切换不同进程,看上去就像是多个进程在同时进行。由于进程间是隔离的,各自拥有自己的内存内存资源,相比于线程的共同共享内存来说,相对安全,不同进程之间的数据只能通过 IPC(Inter-Process Communication) 进行通信共享。线程是CPU调度的最小单位。如果进程是一个容器,线程就是运行在容器里面的程序,线程是属于进程的,同个进程的多个线程共享进程的内存地址空间。线程间的通信可以直接通过全局变量进行通信,所以相对来说,线程间通信是不太安全的,因此引入了各种锁的场景,不在这里阐述。当一个线程崩溃了,会导致整个进程也崩溃了,即其他线程也挂了, 但多进程而不会,一个进程挂了,另一个进程依然照样运行。在多核操作系统中,默认进程内只有一个线程,所以对多进程的处理就像是一个进程一个核心。同步和异步关注的是消息通信机制,所谓同步,就是在发出一个函数调用时,在没有得到结果之前,该调用不会返回。一旦调用返回,就立即得到执行的返回值,即调用者主动等待调用结果。所谓异步,就是在请求发出去后,这个调用就立即返回,没有返回结果,通过回调等方式告知该调用的实际结果。同步的请求,需要主动读写数据,并且等待结果;异步的请求,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。所以,区分的条件在于,进程/线程要访问的数据是否就绪,进程/线程是否需要等待。非阻塞一般通过多路复用实现,多路复用有 select、poll、epoll几种实现方式。协程是属于线程的,又称微线程,纤程,英文名Coroutine。举个例子,在执行函数A时,我希望随时中断去执行函数B,然后中断B的执行,切换回来执行A。这就是协程的作用,由调用者自由切换。这个切换过程并不是等同于函数调用,因为它没有调用语句。执行方式与多线程类似,但是协程只有一个线程执行。协程的优点是执行效率非常高,因为协程的切换由程序自身控制,不需要切换线程,即没有切换线程的开销。同时,由于只有一个线程,不存在冲突问题,不需要依赖锁(加锁与释放锁存在很多资源消耗)。协程主要的使用场景在于处理IO密集型程序,解决效率问题,不适用于CPU密集型程序的处理。然而实际场景中这两种场景非常多,如果要充分发挥CPU利用率,可以结合多进程+协程的方式。后续我们会讲到结合点。根据wikipedia的定义,协程是一个无优先级的子程序调度组件,允许子程序在特点的地方挂起恢复。所以理论上,只要内存足够,一个线程中可以有任意多个协程,但同一时刻只能有一个协程在运行,多个协程分享该线程分配到的计算机资源。协程是为了充分发挥异步调用的优势,异步操作则是为了避免IO操作阻塞线程。1)现代主流的操作系统几乎都是分时操作系统,即一台计算机采用时间片轮转的方式为多个用户服务,系统资源分配的基本单位是进程,CPU调度的基本单位是线程。
2)运行时内存空间分为变量区,栈区,堆区。内存地址分配上,堆区从低地到高,栈区从高往低。
3)计算机执行时一条条指令读取执行,执行到当前指令时,下一条指令的地址在指令寄存器的IP中,ESP寄存值指向当前栈顶地址,EBP指向当前活动栈帧的基地址。
4)系统发生函数调用时操作为:先将入参从右往左依次压栈,然后把返回地址压栈,最后将当前EBP寄存器的值压栈,修改ESP寄存器的值,在栈区分配当前函数局部变量所需的空间。
5)协程的上下文包含属于当前协程的栈区和寄存器里面存放的值。
在python3.3中,通过关键字yield from使用协程,在3.5中,引入了关于协程的语法糖async和await,我们主要看async/await的原理解析。其中,事件循环是一个核心所在,编写过 js的同学,会对事件循环Eventloop更加了解, 事件循环是一种等待程序分配事件或消息的编程架构(维基百科)。在python中,asyncio.coroutine 修饰器用来标记作为协程的函数, 这里的协程是和asyncio及其事件循环一起使用的,而在后续的发展中,async/await被使用的越来越广泛。async/await是使用python协程的关键,从结构上来看,asyncio 实质上是一个异步框架,async/await 是为异步框架提供的 API已方便使用者调用,所以使用者要想使用async/await 编写协程代码,目前必须机遇 asyncio 或其他异步库。在实际开发编写异步代码时,为了避免太多的回调方法导致的回调地狱,但又需要获取异步调用的返回结果结果,聪明的语言设计者设计了一个 叫Future的对象,封装了与loop 的交互行为。其大致执行过程为:程序启动后,通过add_done_callback 方法向 epoll 注册回调函数,当 result 属性得到返回值后,主动运行之前注册的回调函数,向上传递给 coroutine。这个Future对象为asyncio.Future。但是,要想取得返回值,程序必须恢复恢复工作状态,而由于Future 对象本身的生存周期比较短,每一次注册回调、产生事件、触发回调过程后工作可能已经完成,所以用 Future 向生成器 send result 并不合适。所以这里又引入一个新的对象 Task,保存在Future 对象中,对生成器协程进行状态管理。Python 里另一个 Future 对象是 concurrent.futures.Future,与 asyncio.Future 互不兼容,容易产生混淆。区别点在于,concurrent.futures 是线程级的 Future 对象,当使用 concurrent.futures.Executor 进行多线程编程时,该对象用于在不同的 thread 之间传递结果。上文中提到,Task是维护生成器协程状态处理执行逻辑的的任务对象,Task 中有一个_step 方法,负责生成器协程与 EventLoop 交互过程的状态迁移,整个过程可以理解为:Task向协程 send 一个值,恢复其工作状态。当协程运行到断点后,得到新的Future对象,再处理 future 与 loop 的回调注册过程。在日常开发中,会有一个误区,认为每个线程都可以有一个独立的 loop。实际运行时,主线程才能通过 asyncio.get_event_loop() 创建一个新的 loop,而在其他线程时,使用 get_event_loop() 却会抛错。正确的做法为通过 asyncio.set_event_loop() ,将当前线程与 主线程的loop 显式绑定。Loop有一个很大的缺陷,就是 loop 的运行状态不受 Python 代码控制,所以在业务处理中,无法稳定的将协程拓展到多线程中运行。介绍完概念和原理,我来看看如何使用,这里,举一个实际场景的例子,来看看如何使用python的协程。外部接收一些文件,每个文件里有一组数据,其中,这组数据需要通过http的方式,发向第三方平台,并获得结果。由于同一个文件的每一组数据没有前后的处理逻辑,在之前通过Requests库发送的网络请求,串行执行,下一组数据的发送需要等待上一组数据的返回,显得整个文件的处理时间长,这种请求方式,完全可以由协程来实现。为了更方便的配合协程发请求,我们使用aiohttp库来代替requests库,关于aiohttp,这里不做过多剖析,仅做下简单介绍。aiohttp是asyncio和Python的异步HTTP客户端/服务器,由于是异步的,经常用在服务区端接收请求,和客户端爬虫应用,发起异步请求,这里我们主要用来发请求。aiohttp支持客户端和HTTP服务器,可以实现单线程并发IO操作,无需使用Callback Hell即可支持Server WebSockets和Client WebSockets,且具有中间件。直接上代码了,talk is cheap, show me the code~import aiohttp
import asyncio
from inspect import isfunction
import time
import logger
@logging_utils.exception(logger)
def request(pool, data_list):
loop = asyncio.get_event_loop()
loop.run_until_complete(exec(pool, data_list))
async def exec(pool, data_list):
tasks = []
sem = asyncio.Semaphore(pool)
for item in data_list:
tasks.append(
control_sem(sem,
item.get("method", "GET"),
item.get("url"),
item.get("data"),
item.get("headers"),
item.get("callback")))
await asyncio.wait(tasks)
async def control_sem(sem, method, url, data, headers, callback):
async with sem:
count = 0
flag = False
while not flag and count < 4:
flag = await fetch(method, url, data, headers, callback)
count = count + 1
print("flag:{},count:{}".format(flag, count))
if count == 4 and not flag:
raise Exception('EAS service not responding after 4 times of retry.')
async def fetch(method, url, data, headers, callback):
async with aiohttp.request(method, url=url, data=data, headers=headers) as resp:
try:
json = await resp.read()
print(json)
if resp.status != 200:
return False
if isfunction(callback):
callback(json)
return True
except Exception as e:
print(e)
这里,我们封装了对外发送批量请求的request方法,接收一次性发送的数据多少,和数据综合,在外部使用时,只需要构建好网络请求对象的数据,设定好请求池大小即可,同时,设置了重试功能,进行了4次重试,防止在网络抖动的时候,单个数据的网络请求发送失败。在使用协程重构网络请求模块之后,当数据量在1000的时候,由之前的816s,提升到424s,快了一倍,且请求池大小加大的时候,效果更明显,由于第三方平台同时建立连接的数据限制,我们设定了40的阀值。可以看到,优化的程度很显著。人生苦短,我用python。协程好不好,谁用谁知道。如果有类似的场景,可以考虑启用,或者其他场景,欢迎留言讨论。理解async/await:
https://segmentfault.com/a/1190000015488033?spm=ata.13261165.0.0.57d41b119Uyp8t
协程概念,原理(c++和node.js实现)
https://cnodejs.org/topic/58ddd7a303d476b42d34c911?spm=ata.13261165.0.0.57d41b119Uyp8t