聲明:本文針對的是python3.4以后的版本的,因?yàn)閺?.4開始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。比如在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,但是在3.5以后的版本中使用async、await兩個關(guān)鍵字代替,雖然語法上稍微有所差異,但是原理是一樣的。本文用最通俗的語言解釋了pythonasyncio背后的一些核心概念,簡要解析了asyncio的設(shè)計架構(gòu),并給出了使用python進(jìn)行asyncio異步編程的一般模板。本文較長:閱讀全文約30min。
目錄
一 一些最重要的概念
1.1 協(xié)程(coroutine)——本質(zhì)就是一個
函數(shù)
1.2 事件循環(huán)——event_loop
1.3 什么是awaitable對象——即可暫停
等待的對象
1.4 什么是task任務(wù)
1.5 什么是future?
二 asyncio的基本架構(gòu)
2.1 常見的一些高層API方法
2.2 Task 類詳解
2.3 異步函數(shù)的結(jié)果獲取
三 asyncio異步編程的基本模板
3.1 python3.7之前的版本
3.1.1 例子一:無參數(shù)、無返回值
3.1.2 例子二:有參數(shù)、有返回值
3.1.3 總結(jié):四步走(針對python3.7
之前的版本)
3.2 python3.7版本
3.2.1 例子一:無參數(shù)、無返回值
3.2.2 例子二:有參數(shù)、有返回值
3.2.3 總結(jié):兩步走(針對python3.7)
四 協(xié)程編程的優(yōu)點(diǎn)
01
一些最重要的概念
協(xié)程(coroutine)——本質(zhì)就是一個函數(shù)
那么協(xié)程函數(shù)有什么作用呢?
作用一:返回future的結(jié)果。什么是future?后面會講到。當(dāng)協(xié)程函數(shù)執(zhí)行到這一句,協(xié)程會被懸掛起來,知道future的結(jié)果被返回。如果是future被中途取消,則會觸發(fā)CancelledError異常。由于task是future的子類,后面也會介紹,關(guān)于future的所有應(yīng)用,都同樣適用于task
事件循環(huán)——event_loop
import time
import asyncio
async def say_after_time(delay,what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f'開始時間為: {time.time()}')
await say_after_time(1,'hello')
await say_after_time(2,'world')
print(f'結(jié)束時間為: {time.time()}')
loop=asyncio.get_event_loop() #創(chuàng)建事件循環(huán)對象
#loop=asyncio.new_event_loop() #與上面等價,創(chuàng)建新的事件循環(huán)
loop.run_until_complete(main()) #通過事件循環(huán)對象運(yùn)行協(xié)程函數(shù)
loop.close()
在python3.6版本中,如果我們單獨(dú)像執(zhí)行普通函數(shù)那樣執(zhí)行一個協(xié)程函數(shù),只會返回一個coroutine對象(python3.7)如下所示:
>>> main()
<coroutine object main at 0x1053bb7c8>
(1)獲取事件循環(huán)對象的幾種方式:
下面幾種方式可以用來獲取、設(shè)置、創(chuàng)建事件循環(huán)對象loop
loop=asyncio.get_running_loop() 返回(獲?。┰诋?dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會顯示錯誤;它是python3.7中新添加的loop=asyncio.get_event_loop() 獲得一個事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個新的事件循環(huán)loop;
loop=asyncio.set_event_loop(loop) 設(shè)置一個事件循環(huán)為當(dāng)前線程的事件循環(huán);
loop=asyncio.new_event_loop() 創(chuàng)建一個新的事件循環(huán)
(2)通過事件循環(huán)運(yùn)行協(xié)程函數(shù)的兩種方式:
①方式一:創(chuàng)建事件循環(huán)對象loop,即asyncio.get_event_loop(),通過事件循環(huán)運(yùn)行協(xié)程函數(shù)
②方式二:直接通過asyncio.run(function_name)運(yùn)行協(xié)程函數(shù)。但是需要注意的是,首先run函數(shù)是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數(shù)總是會創(chuàng)建一個新的事件循環(huán)并在run結(jié)束之后關(guān)閉事件循環(huán),所以,如果在同一個線程中已經(jīng)有了一個事件循環(huán),則不能再使用這個函數(shù)了,因?yàn)橥粋€線程不能有兩個事件循環(huán),而且這個run函數(shù)不能同時運(yùn)行兩次,因?yàn)樗呀?jīng)創(chuàng)建一個了。即同一個線程中是不允許有多個事件循環(huán)loop的。
asyncio.run()是python3.7 新添加的內(nèi)容,也是后面推薦的運(yùn)行任務(wù)的方式,因?yàn)樗歉邔覣PI,后面會講到它與asyncio.run_until_complete()的差異性,run_until_complete()是相對較低層的API。
注意:到底什么是事件循環(huán)?如何理解?
可以這樣理解:線程一直在各個協(xié)程方法之間永不停歇的游走,遇到一個yield from 或者await就懸掛起來,然后又走到另外一個方法,依次進(jìn)行下去,知道事件循環(huán)所有的方法執(zhí)行完畢。實(shí)際上loop是BaseEventLoop的一個實(shí)例,我們可以查看定義,它到底有哪些方法可調(diào)用。
什么是awaitable對象——即可暫停等待的對象
什么是task任務(wù)
方法一:task = asyncio.create_task(coro()) # 這是3.7版本新添加的
什么是future?
02
asyncio的基本架構(gòu)
前面介紹了asyncio里面最為核心的幾個概念,如果能夠很好地理解這些概念,對于學(xué)習(xí)協(xié)程是非常有幫助的,但是按照我個人的風(fēng)格,我會先說asyncio的架構(gòu),理解asyncio的設(shè)計架構(gòu)有助于更好地應(yīng)用和理解。
asyncio分為高層API和低層API,我們都可以使用,就像我前面在講matplotlib的架構(gòu)的時候所講的一樣,我們前面所講的Coroutine和Tasks屬于高層API,而Event Loop 和Future屬于低層API。當(dāng)然asyncio所涉及到的功能遠(yuǎn)不止于此,我們只看這么多。下面是是高層API和低層API的概覽:
High-level APIs
●Coroutines and Tasks(本文要寫的)
●Streams
●Synchronization Primitives
●Subprocesses
●Queues
●Exceptions
Low-level APIs
●Event Loop(下一篇要寫的)
●Futures
●Transports and Protocols
●Policies
●Platform Support
所謂的高層API主要是指那些asyncio.xxx()的方法
常見的一些高層API方法
(1)運(yùn)行異步協(xié)程
asyncio.run(coro, *, debug=False) #運(yùn)行一個一步程序,參見上面
(2)創(chuàng)建任務(wù)
task=asyncio.create_task(coro) #python3.7 ,參見上面
task = asyncio.ensure_future(coro())
(3)睡眠
await asyncio.sleep(delay, result=None, *, loop=None)
這個函數(shù)表示的是:當(dāng)前的那個任務(wù)(協(xié)程函數(shù))睡眠多長時間,而允許其他任務(wù)執(zhí)行。這是它與time.sleep()的區(qū)別,time.sleep()是當(dāng)前線程休息,注意他們的區(qū)別哦。
另外如果提供了參數(shù)result,當(dāng)當(dāng)前任務(wù)(協(xié)程)結(jié)束的時候,它會返回;
loop參數(shù)將會在3.10中移除,這里就不再說了。
(4)并發(fā)運(yùn)行多個任務(wù)
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
它本身也是awaitable的。
*coros_or_futures是一個序列拆分操作,如果是以個協(xié)程函數(shù),則會自動轉(zhuǎn)換成Task。
當(dāng)所有的任務(wù)都完成之后,返回的結(jié)果是一個列表的形式,列表中值的順序和*coros_or_futures完成的順序是一樣的。
return_exceptions:
False,這是他的默認(rèn)值,第一個出發(fā)異常的任務(wù)會立即返回,然后其他的任務(wù)繼續(xù)執(zhí)行;
True,對于已經(jīng)發(fā)生了異常的任務(wù),也會像成功執(zhí)行了任務(wù)那樣,等到所有的任務(wù)執(zhí)行結(jié)束一起將錯誤的結(jié)果返回到最終的結(jié)果列表里面。
如果gather()本身被取消了,那么綁定在它里面的任務(wù)也就取消了。
(5)防止任務(wù)取消
await asyncio.shield(*arg, *, loop=None)
它本身也是awaitable的。顧名思義,shield為屏蔽、保護(hù)的意思,即保護(hù)一個awaitable 對象防止取消,一般情況下不推薦使用,而且在使用的過程中,最好使用try語句塊更好。
try:
res = await shield(something())
except CancelledError:
res = None
(6)設(shè)置timeout——一定要好好理解
await asyncio.
wait_for
(aw, timeout, *, loop=None)
如果aw是一個協(xié)程函數(shù),會自動包裝成一個任務(wù)task。參見下面的例子:
import asyncio
async def eternity():
print('我馬上開始執(zhí)行')
await asyncio.sleep(3600) #當(dāng)前任務(wù)休眠1小時,即3600秒
print('終于輪到我了')
async def main():
# Wait for at most 1 second
try:
print('等你3秒鐘哦')
await asyncio.wait_for(eternity(), timeout=3) #休息3秒鐘了執(zhí)行任務(wù)
except asyncio.TimeoutError:
print('超時了!')
asyncio.run(main())
'''運(yùn)行結(jié)果為:
等你3秒鐘哦
我馬上開始執(zhí)行
超時了!
'''
為什么?首先調(diào)用main()函數(shù),作為入口函數(shù),當(dāng)輸出‘等你3秒鐘哦’之后,main掛起,執(zhí)行eternity,然后打印‘我馬上開始執(zhí)行’,然后eternity掛起,而且要掛起3600秒,大于3,這時候出發(fā)TimeoutError。修改一下:‘’
import asyncio
async def eternity():
print('我馬上開始執(zhí)行')
await asyncio.sleep(2) #當(dāng)前任務(wù)休眠2秒鐘,2<3
print('終于輪到我了')
async def main():
# Wait for at most 1 second
try:
print('等你3秒鐘哦')
await asyncio.wait_for(eternity(), timeout=3) #給你3秒鐘執(zhí)行你的任務(wù)
except asyncio.TimeoutError:
print('超時了!')
asyncio.run(main())
'''運(yùn)行結(jié)果為:
等你3秒鐘哦
我馬上開始執(zhí)行
終于輪到我了
'''
總結(jié):當(dāng)異步操作需要執(zhí)行的時間超過waitfor設(shè)置的timeout,就會觸發(fā)異常,所以在編寫程序的時候,如果要給異步操作設(shè)置timeout,一定要選擇合適,如果異步操作本身的耗時較長,而你設(shè)置的timeout太短,會涉及到她還沒做完,就拋出異常了。
(7)多個協(xié)程函數(shù)時候的等候
await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
與上面的區(qū)別是,第一個參數(shù)aws是一個集合,要寫成集合set的形式,比如:
{func(),func(),func3()}
表示的是一系列的協(xié)程函數(shù)或者是任務(wù),其中協(xié)程會自動包裝成任務(wù)。事實(shí)上,寫成列表的形式也是可以的。
注意:該函數(shù)的返回值是兩個Tasks/Futures的集合:
(done, pending)
其中done是一個集合,表示已經(jīng)完成的任務(wù)tasks;pending也是一個集合,表示還沒有完成的任務(wù)。
常見的使用方法為:done, pending = await asyncio.wait(aws)
參數(shù)解釋:
timeout (a float or int), 同上面的含義一樣,需要注意的是,這個不會觸發(fā)asyncio.TimeoutError異常,如果到了timeout還有任務(wù)沒有執(zhí)行完,那些沒有執(zhí)行完的tasks和futures會被返回到第二個集合pending里面。
return_when參數(shù),顧名思義,他表示的是,什么時候wait函數(shù)該返回值。只能夠去下面的幾個值。
Constant | Description |
FIRST_COMPLETED | first_completes.當(dāng)任何一個task或者是future完成或者是取消,wait函數(shù)就返回 |
FIRST_EXCEPTION | 當(dāng)任何一個task或者是future觸發(fā)了某一個異常,就返回,.如果是所有的task和future都沒有觸發(fā)異常,則等價與下面的 ALL_COMPLETED . |
ALL_COMPLETED | 當(dāng)所有的task或者是future都完成或者是都取消的時候,再返回。 |
如下面例子所示:
import asyncio
import time
a=time.time()
async def hello1(): #大約2秒
print('Hello world 01 begin')
yield from asyncio.sleep(2)
print('Hello again 01 end')
async def hello2(): #大約3秒
print('Hello world 02 begin')
yield from asyncio.sleep(3)
print('Hello again 02 end')
async def hello3(): #大約4秒
print('Hello world 03 begin')
yield from asyncio.sleep(4)
print('Hello again 03 end')
async def main(): #入口函數(shù)
done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED)
for i in done:
print(i)
for j in pending:
print(j)
asyncio.run(main()) #運(yùn)行入口函數(shù)
b=time.time()
print('---------------------------------------')
print(b-a)
'''運(yùn)行結(jié)果為:
Hello world 02 begin
Hello world 01 begin
Hello world 03 begin
Hello again 01 end
<Task finished coro=<hello1() done, defined at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:46> result=None>
<Task pending coro=<hello3() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394438>()]>>
<Task pending coro=<hello2() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394378>()]>>
---------------------------------------
2.033155679702759
'''
從上面可以看出,hello1()試運(yùn)行結(jié)束了的,hello2()和hello3()還沒結(jié)束。
(8)asyncio.as_completed()函數(shù)
這個函數(shù)我沒有找到合適的中文名稱去描述,所以哪位大神如果知道,望告知,不勝感激!它的函數(shù)原型如下:
asyncio.as_completed(aws, *, loop=None, timeout=None)
第一個參數(shù)aws:同上面一樣,是一個集合{}集合里面的元素是coroutine、task或者future
第三個參數(shù)timeout:意義和上面講的的一樣
那到底什么作用呢?其實(shí)很簡單,個人感覺有點(diǎn)雞肋,從一個例子看起:
import asyncio
import time
import threading
a=time.time()
@asyncio.coroutine
def hello1():
print('Hello world 01 begin')
yield from asyncio.sleep(5) #大約5秒
print('Hello again 01 end')
return '哈哈1'
@asyncio.coroutine
def hello2():
print('Hello world 02 begin')
yield from asyncio.sleep(3) #大約3秒
print('Hello again 02 end')
return '哈哈2'
@asyncio.coroutine
def hello3():
print('Hello world 03 begin')
yield from asyncio.sleep(4) #大約4秒
print('Hello again 03 end')
return '哈哈3'
async def main():
s=asyncio.as_completed({hello1(),hello2(),hello3()})
for f in s:
result=await f
print(result)
asyncio.run(main())
b=time.time()
print('---------------------------------------')
print(b-a)
'''運(yùn)行結(jié)果為:
Hello world 03 begin
Hello world 01 begin
Hello world 02 begin
Hello again 01 end
哈哈1
Hello again 02 end
哈哈2
Hello again 03 end
哈哈3
---------------------------------------
4.0225794315338135
'''
結(jié)論:asyncio.as_completed()函數(shù)返回的是一個可迭代(iterator)的對象,對象的每個元素就是一個future對象,很多小伙伴說,這不是相當(dāng)于沒變嗎?其實(shí)返回的future集合是對參數(shù)的future集合重新組合,組合的順序就是,最先執(zhí)行完的協(xié)程函數(shù)(coroutine、task、future)最先返回,從上面的代碼可知,參數(shù)為
aws={hello1(),hello2(),hello3()},因?yàn)閔ello1大約花費(fèi)5秒、hello2大約花費(fèi)3秒、hello3大約花費(fèi)4秒。返回的結(jié)果為
s={hello2()、hello3()、hello(1)},因?yàn)閔ello2時間最短,故而放在前面,hello1時間最長,故而放在最后面。然后對返回的集合s開始迭代。
Task 類詳解
先來看一下Task類的簡單介紹(英文原文文檔)。
上面的文字描述中推出了幾個非常重要的信息,特在此總結(jié)如下:
(1)他是作為一個python協(xié)程對象,和Future對象很像的這么一個對象,但不是線程安全的;他繼承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
(2)使用高層API asyncio.ccreate_task()創(chuàng)建任務(wù),或者是使用低層API loop.create_task()或者是loop.ensure_future()創(chuàng)建任務(wù)對象;
(3)相比于協(xié)程函數(shù),任務(wù)時有狀態(tài)的,可以使用Task.cancel()進(jìn)行取消,這會觸發(fā)CancelledError異常,使用cancelled()檢查是否取消。
下面介紹Task類常見的一些使用函數(shù)
(1)cancel()
Request the Task to be cancelled.
其實(shí)前面已經(jīng)有所介紹,最好是使用他會出發(fā)CancelledError異常,所以需要取消的協(xié)程函數(shù)里面的代碼最好在try-except語句塊中進(jìn)行,這樣方便觸發(fā)異常,打印相關(guān)信息,但是Task.cancel()沒有辦法保證任務(wù)一定會取消,而Future.cancel()是可以保證任務(wù)一定取消的。可以參見下面的一個例子:
import asyncio
async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600) #模擬一個耗時任務(wù)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
#通過協(xié)程創(chuàng)建一個任務(wù),需要注意的是,在創(chuàng)建任務(wù)的時候,就會跳入到異步開始執(zhí)行
#因?yàn)槭?.7版本,創(chuàng)建一個任務(wù)就相當(dāng)于是運(yùn)行了異步函數(shù)cancel_me
task = asyncio.create_task(cancel_me())
#等待一秒鐘
await asyncio.sleep(1)
print('main函數(shù)休息完了')
#發(fā)出取消任務(wù)的請求
task.cancel()
try:
await task #因?yàn)槿蝿?wù)被取消,觸發(fā)了異常
except asyncio.CancelledError:
print('main(): cancel_me is cancelled now')
asyncio.run(main())
'''運(yùn)行結(jié)果為:
cancel_me(): before sleep
main函數(shù)休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''
運(yùn)行過程分析:
首先run函數(shù)啟動主函數(shù)入口main,在main中,因?yàn)榈谝痪湓捑褪钦{(diào)用異步函數(shù)cancel_me()函數(shù),所以先打印出了第一句話;
然后進(jìn)入cancel_me中的try語句,遇到await,暫停,這時候返回main中執(zhí)行,但是有在main中遇到了await,也會暫停,但是由于main中只需要暫停1秒,而cancel_me中要暫停3600秒,所以等到main的暫停結(jié)束后,接著運(yùn)行main,所以打印出第二句話;
接下來遇到取消任務(wù)的請求task.cancel(),然后繼續(xù)執(zhí)行main里面的try,又遇到了await,接著main進(jìn)入暫停,接下來進(jìn)入到cancel_me函數(shù)中,但是由于main中請求了取消任務(wù),所以那個耗時3600秒的任務(wù)就不再執(zhí)行了,直接觸發(fā)了Cancelled_Error異常,打印出第三句話,接下來又raise一個異常信息;
接下來執(zhí)行cancel_me的finally,打印出第四句話,此時cancel_me執(zhí)行完畢,由于他拋出了一個異常,返回到主程序main中,觸發(fā)異常,打印出第五句話。
(2)done()
當(dāng)一個被包裝得協(xié)程既沒有觸發(fā)異常、也沒有被取消的時候,意味著它是done的,返回true。
(3)result()
返回任務(wù)的執(zhí)行結(jié)果,
當(dāng)任務(wù)被正常執(zhí)行完畢,則返回結(jié)果;
當(dāng)任務(wù)被取消了,調(diào)用這個方法,會觸發(fā)CancelledError異常;
當(dāng)任務(wù)返回的結(jié)果是無用的時候,則調(diào)用這個方法會觸發(fā)InvalidStateError;
當(dāng)任務(wù)出發(fā)了一個異常而中斷,調(diào)用這個方法還會再次觸發(fā)這個使程序中斷的異常。
(4)exception()
返回任務(wù)的異常信息,觸發(fā)了什么異常,就返回什么異常,如果任務(wù)是正常執(zhí)行的無異常,則返回None;
當(dāng)任務(wù)被取消了,調(diào)用這個方法會觸發(fā)CancelledError異常;
當(dāng)任務(wù)沒有做完,調(diào)用這個方法會觸發(fā)InvalidStateError異常。
下面還有一些不常用的方法,如下:
(5)add_done_callback(callback, *, context=None)
(6)remove_done_callback(callback)
(7)get_stack(*, limit=None)
(8)print_stack(*, limit=None, file=None)
(9)all_tasks(loop=None),這是一個類方法
(10)current_task(loop=None),這是一個類方法
異步函數(shù)的結(jié)果獲取
對于異步編程、異步函數(shù)而言,最重要的就是異步函數(shù)調(diào)用結(jié)束之后,獲取異步函數(shù)的返回值,我們可以有以下幾種方式來獲取函數(shù)的返回值,第一是直接通過Task.result()來獲??;第二種是綁定一個回調(diào)函數(shù)來獲取,即函數(shù)執(zhí)行完畢后調(diào)用一個函數(shù)來獲取異步函數(shù)的返回值。
(1)直接通過result來獲取
import asyncio
import time
async def hello1(a,b):
print('Hello world 01 begin')
await asyncio.sleep(3) #模擬耗時任務(wù)3秒
print('Hello again 01 end')
return a+b
coroutine=hello1(10,5)
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán)
task=asyncio.ensure_future(coroutine) #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表
loop.run_until_complete(task) #第三步:通過事件循環(huán)運(yùn)行
print('-------------------------------------')
print(task.result())
loop.close()
'''運(yùn)行結(jié)果為
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''
(2)通過定義回調(diào)函數(shù)來獲取
import asyncio
import time
async def hello1(a,b):
print('Hello world 01 begin')
await asyncio.sleep(3) #模擬耗時任務(wù)3秒
print('Hello again 01 end')
return a+b
def callback(future): #定義的回調(diào)函數(shù)
print(future.result())
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán)
task=asyncio.ensure_future(hello1(10,5)) #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)
task.add_done_callback(callback) #并被任務(wù)綁定一個回調(diào)函數(shù)
loop.run_until_complete(task) #第三步:通過事件循環(huán)運(yùn)行
loop.close() #第四步:關(guān)閉事件循環(huán)
'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello again 01 end
15
'''
注意:所謂的回調(diào)函數(shù),就是指協(xié)程函數(shù)coroutine執(zhí)行結(jié)束時候會調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果。我們創(chuàng)建的task和回調(diào)里的future對象,實(shí)際上是同一個對象,因?yàn)閠ask是future的子類。
03
asyncio異步編程的基本模板
事實(shí)上,在使用asyncio進(jìn)行異步編程的時候,語法形式往往是多樣性的,雖然理解異步編程的核心思想很重要,但是實(shí)現(xiàn)的時候終究還是要編寫語句的,本次給出的模板,是兩個不同的例子,例子一是三個異步方法,它們都沒有參數(shù),沒有返回值,都模擬一個耗時任務(wù);例子二是三個異步方法,都有參數(shù),都有返回值。
python3.7之前的版本
import asyncio
import time
a=time.time()
async def hello1():
print('Hello world 01 begin')
await asyncio.sleep(3) #模擬耗時任務(wù)3秒
print('Hello again 01 end')
async def hello2():
print('Hello world 02 begin')
await asyncio.sleep(2) #模擬耗時任務(wù)2秒
print('Hello again 02 end')
async def hello3():
print('Hello world 03 begin')
await asyncio.sleep(4) #模擬耗時任務(wù)4秒
print('Hello again 03 end')
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán)
tasks = [hello1(), hello2(),hello3()] #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環(huán)運(yùn)行
loop.close() #第四步:取消事件循環(huán)
'''運(yùn)行結(jié)果為:
Hello world 02 begin
Hello world 03 begin
Hello world 01 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
'''
例子二:有參數(shù)、有返回值
import asyncio
import time
async def hello1(a,b):
print('Hello world 01 begin')
await asyncio.sleep(3) #模擬耗時任務(wù)3秒
print('Hello again 01 end')
return a+b
async def hello2(a,b):
print('Hello world 02 begin')
await asyncio.sleep(2) #模擬耗時任務(wù)2秒
print('Hello again 02 end')
return a-b
async def hello3(a,b):
print('Hello world 03 begin')
await asyncio.sleep(4) #模擬耗時任務(wù)4秒
print('Hello again 03 end')
return a*b
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán)
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
tasks = [task1,task2,task3] #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環(huán)運(yùn)行
print(task1.result()) #并且在所有的任務(wù)完成之后,獲取異步函數(shù)的返回值
print(task2.result())
print(task3.result())
loop.close() #第四步:關(guān)閉事件循環(huán)
'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
總結(jié):四步走(針對python3.7之前的版本)
第一步·:構(gòu)造事件循環(huán)
loop=asyncio.get_running_loop() #返回(獲?。┰诋?dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會顯示錯誤;它是python3.7中新添加的
loop=asyncio.get_event_loop() #獲得一個事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個新的事件循環(huán)loop;
loop=asyncio.set_event_loop(loop) #設(shè)置一個事件循環(huán)為當(dāng)前線程的事件循環(huán);
loop=asyncio.new_event_loop() #創(chuàng)建一個新的事件循環(huán)
第二步:將一個或者是多個協(xié)程函數(shù)包裝成任務(wù)Task
#高層API
task = asyncio.create_task(coro(參數(shù)列表)) # 這是3.7版本新添加的
task = asyncio.ensure_future(coro(參數(shù)列表))
#低層API
loop.create_future(coro)
loop.create_task(coro)
'''需要注意的是,在使用Task.result()獲取協(xié)程函數(shù)結(jié)果的時候,使用asyncio.create_task()卻會顯示錯
但是使用asyncio.ensure_future卻正確,本人暫時不知道原因,哪位大神知道,望告知,不勝感激!'''
第三步:通過事件循環(huán)運(yùn)行
loop.run_until_complete(asyncio.wait(tasks)) #通過asyncio.wait()整合多個task
loop.run_until_complete(asyncio.gather(tasks)) #通過asyncio.gather()整合多個task
loop.run_until_complete(task_1) #單個任務(wù)則不需要整合
loop.run_forever() #但是這個方法在新版本已經(jīng)取消,不再推薦使用,因?yàn)槭褂闷饋聿缓啙?/span>
'''
使用gather或者wait可以同時注冊多個任務(wù),實(shí)現(xiàn)并發(fā),但他們的設(shè)計是完全不一樣的,在前面的2.1.(4)中已經(jīng)討論過了,主要區(qū)別如下:
(1)參數(shù)形式不一樣
gather的參數(shù)為 *coroutines_or_futures,即如這種形式
tasks = asyncio.gather(*[task1,task2,task3])或者
tasks = asyncio.gather(task1,task2,task3)
loop.run_until_complete(tasks)
wait的參數(shù)為列表或者集合的形式,如下
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)
(2)返回的值不一樣
gather的定義如下,gather返回的是每一個任務(wù)運(yùn)行的結(jié)果,
results = await asyncio.gather(*tasks)
wait的定義如下,返回dones是已經(jīng)完成的任務(wù),pending是未完成的任務(wù),都是集合類型
done, pending = yield from asyncio.wait(fs)
(3)后面還會講到他們的進(jìn)一步使用
'''
簡單來說:async.wait會返回兩個值:done和pending,done為已完成的協(xié)程Task,pending為超時未完成的協(xié)程Task,需通過future.result調(diào)用Task的result。而async.gather返回的是已完成Task的result。
第四步:關(guān)閉事件循環(huán)
loop.close()
'''
以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢?
簡單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行:
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
但是如果關(guān)閉了,就不能再運(yùn)行了:
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3)) # 此處異常
建議調(diào)用 loop.close,以徹底清理 loop 對象防止誤用
'''
python3.7版本
在最新的python3.7版本中,asyncio又引進(jìn)了一些新的特性和API
import asyncio
import time
async def hello1():
print('Hello world 01 begin')
await asyncio.sleep(3) #模擬耗時任務(wù)3秒
print('Hello again 01 end')
async def hello2():
print('Hello world 02 begin')
await asyncio.sleep(2) #模擬耗時任務(wù)2秒
print('Hello again 02 end')
async def hello3():
print('Hello world 03 begin')
await asyncio.sleep(4) #模擬耗時任務(wù)4秒
print('Hello again 03 end')
async def main():
results=await asyncio.gather(hello1(),hello2(),hello3())
for result in results:
print(result) #因?yàn)闆]返回值,故而返回None
asyncio.run(main())
'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
None
None
None
'''
例子二:有參數(shù)、有返回值
import asyncio
import time
async def hello1(a,b):
print('Hello world 01 begin')
await asyncio.sleep(3) #模擬耗時任務(wù)3秒
print('Hello again 01 end')
return a+b
async def hello2(a,b):
print('Hello world 02 begin')
await asyncio.sleep(2) #模擬耗時任務(wù)2秒
print('Hello again 02 end')
return a-b
async def hello3(a,b):
print('Hello world 03 begin')
await asyncio.sleep(4) #模擬耗時任務(wù)4秒
print('Hello again 03 end')
return a*b
async def main():
results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
for result in results:
print(result)
asyncio.run(main())
'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
總結(jié):兩步走(針對python3.7)
第一步:構(gòu)建一個入口函數(shù)main
他也是一個異步協(xié)程函數(shù),即通過async定義,并且要在main函數(shù)里面await一個或者是多個協(xié)程,同前面一樣,我可以通過gather或者是wait進(jìn)行組合,對于有返回值的協(xié)程函數(shù),一般就在main里面進(jìn)行結(jié)果的獲取。
第二步:啟動主函數(shù)main
這是python3.7新添加的函數(shù),就一句話,即
asyncio.run(main())
注意:
不再需要顯式的創(chuàng)建事件循環(huán),因?yàn)樵趩觬un函數(shù)的時候,就會自動創(chuàng)建一個新的事件循環(huán)。而且在main中也不需要通過事件循環(huán)去掉用被包裝的協(xié)程函數(shù),只需要向普通函數(shù)那樣調(diào)用即可 ,只不過使用了await關(guān)鍵字而已。
04
協(xié)程編程的優(yōu)點(diǎn)
1、無cpu分時切換線程保存上下文問題(協(xié)程上下文怎么保存)
2、遇到io阻塞切換(怎么實(shí)現(xiàn)的)
3、無需共享數(shù)據(jù)的保護(hù)鎖(為什么)
4、系列文章下篇預(yù)告——介紹低層的API,事件循環(huán)到底是怎么實(shí)現(xiàn)的以及future類的實(shí)現(xiàn)。