免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
python協(xié)程系列(五)——asyncio的核心概念與基本架構(gòu)

聲明:本文針對的是python3.4以后的版本的,因?yàn)閺?.4開始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。比如在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,但是在3.5以后的版本中使用async、await兩個關(guān)鍵字代替,雖然語法上稍微有所差異,但是原理是一樣的。本文用最通俗的語言解釋了pythonasyncio背后的一些核心概念,簡要解析了asyncio的設(shè)計架構(gòu),并給出了使用python進(jìn)行asyncio異步編程的一般模板。本文較長:閱讀全文約30min。


目錄

一 一些最重要的概念

     1.1 協(xié)程(coroutine)——本質(zhì)就是一個

          函數(shù)

     1.2 事件循環(huán)——event_loop

     1.3 什么是awaitable對象——即可暫停

          等待的對象

     1.4 什么是task任務(wù)

     1.5 什么是future?

二 asyncio的基本架構(gòu)

     2.1 常見的一些高層API方法

     2.2 Task 類詳解

     2.3 異步函數(shù)的結(jié)果獲取

asyncio異步編程的基本模板

    3.1 python3.7之前的版本

      3.1.1 例子一:無參數(shù)、無返回值

      3.1.2 例子二:有參數(shù)、有返回值

      3.1.3 總結(jié):四步走(針對python3.7

              之前的版本)

    3.2 python3.7版本

      3.2.1 例子一:無參數(shù)、無返回值

      3.2.2 例子二:有參數(shù)、有返回值

      3.2.3 總結(jié):兩步走(針對python3.7)

協(xié)程編程的優(yōu)點(diǎn)


01

一些最重要的概念

協(xié)程(coroutine)——本質(zhì)就是一個函數(shù)


所謂的“協(xié)程”就是一個函數(shù),這個函數(shù)需要有兩個基本的組成要素,第一,需要使用@asyncio.coroutine進(jìn)行裝飾;第二,函數(shù)體內(nèi)一定要有yield from 返回的的generator,或者是說使用yield from 返回另一個協(xié)程對象。
當(dāng)然,這兩個條件并不是硬性規(guī)定的,如果沒有這兩個條件,依然是函數(shù),只不過是普通函數(shù)而已。
怎么判斷一個函數(shù)是不是協(xié)程?通過asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判斷,返回true,則是。

那么協(xié)程函數(shù)有什么作用呢?


(1)result = yield from future

作用一:返回future的結(jié)果。什么是future?后面會講到。當(dāng)協(xié)程函數(shù)執(zhí)行到這一句,協(xié)程會被懸掛起來,知道future的結(jié)果被返回。如果是future被中途取消,則會觸發(fā)CancelledError異常。由于task是future的子類,后面也會介紹,關(guān)于future的所有應(yīng)用,都同樣適用于task


(2)result = yield from coroutine
等候另一個協(xié)程函數(shù)返回結(jié)果或者是觸發(fā)異常

(3)result= yield from task
返回一個task的結(jié)果

(4)return expression
作為一個函數(shù),他本身也是可以返回某一個結(jié)果的

(5)raise exception

事件循環(huán)——event_loop


協(xié)程函數(shù),不是像普通函數(shù)那樣直接調(diào)用運(yùn)行的,必須添加到事件循環(huán)中,然后由事件循環(huán)去運(yùn)行,單獨(dú)運(yùn)行協(xié)程函數(shù)是不會有結(jié)果的,看一個簡單的例子:
import time
import asyncio
async def say_after_time(delay,what):
        await asyncio.sleep(delay)
        print(what)

async def main():
        print(f'開始時間為: {time.time()}')
        await say_after_time(1,'hello')
        await say_after_time(2,'world')
        print(f'結(jié)束時間為: {time.time()}')

loop=asyncio.get_event_loop()    #創(chuàng)建事件循環(huán)對象
#loop=asyncio.new_event_loop()   #與上面等價,創(chuàng)建新的事件循環(huán)
loop.run_until_complete(main())  #通過事件循環(huán)對象運(yùn)行協(xié)程函數(shù)
loop.close()

在python3.6版本中,如果我們單獨(dú)像執(zhí)行普通函數(shù)那樣執(zhí)行一個協(xié)程函數(shù),只會返回一個coroutine對象(python3.7)如下所示:

>>> main()
<coroutine object main at 0x1053bb7c8>


(1)獲取事件循環(huán)對象的幾種方式:
下面幾種方式可以用來獲取、設(shè)置、創(chuàng)建事件循環(huán)對象loop
loop=asyncio.get_running_loop() 返回(獲?。┰诋?dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會顯示錯誤;它是python3.7中新添加的loop=asyncio.get_event_loop() 獲得一個事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個新的事件循環(huán)loop;
loop=asyncio.set_event_loop(loop) 設(shè)置一個事件循環(huán)為當(dāng)前線程的事件循環(huán);
loop=asyncio.new_event_loop() 創(chuàng)建一個新的事件循環(huán)

(2)通過事件循環(huán)運(yùn)行協(xié)程函數(shù)的兩種方式:

①方式一:創(chuàng)建事件循環(huán)對象loop,即asyncio.get_event_loop(),通過事件循環(huán)運(yùn)行協(xié)程函數(shù)
②方式二:直接通過asyncio.run(function_name)運(yùn)行協(xié)程函數(shù)。但是需要注意的是,首先run函數(shù)是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數(shù)總是會創(chuàng)建一個新的事件循環(huán)并在run結(jié)束之后關(guān)閉事件循環(huán),所以,如果在同一個線程中已經(jīng)有了一個事件循環(huán),則不能再使用這個函數(shù)了,因?yàn)橥粋€線程不能有兩個事件循環(huán),而且這個run函數(shù)不能同時運(yùn)行兩次,因?yàn)樗呀?jīng)創(chuàng)建一個了。即同一個線程中是不允許有多個事件循環(huán)loop的。
asyncio.run()是python3.7 新添加的內(nèi)容,也是后面推薦的運(yùn)行任務(wù)的方式,因?yàn)樗歉邔覣PI,后面會講到它與asyncio.run_until_complete()的差異性,run_until_complete()是相對較低層的API。

注意:到底什么是事件循環(huán)?如何理解?

可以這樣理解:線程一直在各個協(xié)程方法之間永不停歇的游走,遇到一個yield from 或者await就懸掛起來,然后又走到另外一個方法,依次進(jìn)行下去,知道事件循環(huán)所有的方法執(zhí)行完畢。實(shí)際上loop是BaseEventLoop的一個實(shí)例,我們可以查看定義,它到底有哪些方法可調(diào)用。


什么是awaitable對象——即可暫停等待的對象


有三類對象是可等待的,即 coroutines, Tasks, and Futures.
coroutine:本質(zhì)上就是一個函數(shù),一前面的生成器yield和yield from為基礎(chǔ),不再贅述;
Tasks: 任務(wù),顧名思義,就是要完成某件事情,其實(shí)就是對協(xié)程函數(shù)進(jìn)一步的封裝;
Future:它是一個“更底層”的概念,他代表一個一步操作的最終結(jié)果,因?yàn)橐徊讲僮饕话阌糜诤臅r操作,結(jié)果不會立即得到,會在“將來”得到異步運(yùn)行的結(jié)果,故而命名為Future。
三者的關(guān)系,coroutine可以自動封裝成task,而Task是Future的子類。

什么是task任務(wù)


如前所述,Task用來 并發(fā)調(diào)度的協(xié)程,即對協(xié)程函數(shù)的進(jìn)一步包裝?那為什么還需要包裝呢?因?yàn)閱渭兊膮f(xié)程函數(shù)僅僅是一個函數(shù)而已,將其包裝成任務(wù),任務(wù)是可以包含各種狀態(tài)的,異步編程最重要的就是對異步操作狀態(tài)的把控了。

(1)創(chuàng)建任務(wù)(兩種方法):

方法一:task = asyncio.create_task(coro())   # 這是3.7版本新添加的


方法二:task = asyncio.ensure_future(coro())

也可以使用
loop.create_future()
loop.create_task(coro)
也是可以的。

備注:關(guān)于任務(wù)的詳解,會在后面的系列文章繼續(xù)講解,本文只是概括性的說明。

(2)獲取某一個任務(wù)的方法:

方法一:task=asyncio.current_task(loop=None)
返回在某一個指定的loop中,當(dāng)前正在運(yùn)行的任務(wù),如果沒有任務(wù)正在運(yùn)行,則返回None;
如果loop為None,則默認(rèn)為在當(dāng)前的事件循環(huán)中獲取,

方法二:asyncio.all_tasks(loop=None)
返回某一個loop中還沒有結(jié)束的任務(wù)

什么是future?


Future是一個較低層的可等待(awaitable)對象,他表示的是異步操作的最終結(jié)果,當(dāng)一個Future對象被等待的時候,協(xié)程會一直等待,直到Future已經(jīng)運(yùn)算完畢。
Future是Task的父類,一般情況下,已不用去管它們兩者的詳細(xì)區(qū)別,也沒有必要去用Future,用Task就可以了,返回 future 對象的低級函數(shù)的一個很好的例子是 loop.run_in_executor().




02

asyncio的基本架構(gòu)

前面介紹了asyncio里面最為核心的幾個概念,如果能夠很好地理解這些概念,對于學(xué)習(xí)協(xié)程是非常有幫助的,但是按照我個人的風(fēng)格,我會先說asyncio的架構(gòu),理解asyncio的設(shè)計架構(gòu)有助于更好地應(yīng)用和理解。

asyncio分為高層API和低層API,我們都可以使用,就像我前面在講matplotlib的架構(gòu)的時候所講的一樣,我們前面所講的Coroutine和Tasks屬于高層API,而Event Loop 和Future屬于低層API。當(dāng)然asyncio所涉及到的功能遠(yuǎn)不止于此,我們只看這么多。下面是是高層API和低層API的概覽:

High-level APIs

    ●Coroutines and Tasks(本文要寫的)
    ●Streams
    ●Synchronization Primitives
    ●Subprocesses
    ●Queues
    ●Exceptions

Low-level APIs

    ●Event Loop(下一篇要寫的)
    ●Futures
    ●Transports and Protocols
    ●Policies
    ●Platform Support

所謂的高層API主要是指那些asyncio.xxx()的方法


常見的一些高層API方法


(1)運(yùn)行異步協(xié)程
asyncio.run(coro, *, debug=False)  #運(yùn)行一個一步程序,參見上面

(2)創(chuàng)建任務(wù)
task=asyncio.create_task(coro)  #python3.7  ,參見上面
task = asyncio.ensure_future(coro())

(3)睡眠
await asyncio.sleep(delay, result=None, *, loop=None)
這個函數(shù)表示的是:當(dāng)前的那個任務(wù)(協(xié)程函數(shù))睡眠多長時間,而允許其他任務(wù)執(zhí)行。這是它與time.sleep()的區(qū)別,time.sleep()是當(dāng)前線程休息,注意他們的區(qū)別哦。
另外如果提供了參數(shù)result,當(dāng)當(dāng)前任務(wù)(協(xié)程)結(jié)束的時候,它會返回;
loop參數(shù)將會在3.10中移除,這里就不再說了。

(4)并發(fā)運(yùn)行多個任務(wù)
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
它本身也是awaitable的。
*coros_or_futures是一個序列拆分操作,如果是以個協(xié)程函數(shù),則會自動轉(zhuǎn)換成Task。
當(dāng)所有的任務(wù)都完成之后,返回的結(jié)果是一個列表的形式,列表中值的順序和*coros_or_futures完成的順序是一樣的。
return_exceptions:

False,這是他的默認(rèn)值,第一個出發(fā)異常的任務(wù)會立即返回,然后其他的任務(wù)繼續(xù)執(zhí)行;
True,對于已經(jīng)發(fā)生了異常的任務(wù),也會像成功執(zhí)行了任務(wù)那樣,等到所有的任務(wù)執(zhí)行結(jié)束一起將錯誤的結(jié)果返回到最終的結(jié)果列表里面。

如果gather()本身被取消了,那么綁定在它里面的任務(wù)也就取消了。

(5)防止任務(wù)取消
await asyncio.shield(*arg, *, loop=None)
它本身也是awaitable的。顧名思義,shield為屏蔽、保護(hù)的意思,即保護(hù)一個awaitable 對象防止取消,一般情況下不推薦使用,而且在使用的過程中,最好使用try語句塊更好。

try:
    res = await shield(something())
except CancelledError:
    res = None

(6)設(shè)置timeout——一定要好好理解

await asyncio.wait_for(aw, timeout, *, loop=None)

如果aw是一個協(xié)程函數(shù),會自動包裝成一個任務(wù)task。參見下面的例子:

import asyncio

async def eternity():
    print('我馬上開始執(zhí)行')
    await asyncio.sleep(3600)  #當(dāng)前任務(wù)休眠1小時,即3600秒
    print('終于輪到我了')

async def main():
    # Wait for at most 1 second
    try:
        print('等你3秒鐘哦')
        await asyncio.wait_for(eternity(), timeout=3)  #休息3秒鐘了執(zhí)行任務(wù)
    except asyncio.TimeoutError:
        print('超時了!')

asyncio.run(main())

'''運(yùn)行結(jié)果為:
等你3秒鐘哦
我馬上開始執(zhí)行
超時了!
'''

為什么?首先調(diào)用main()函數(shù),作為入口函數(shù),當(dāng)輸出‘等你3秒鐘哦’之后,main掛起,執(zhí)行eternity,然后打印‘我馬上開始執(zhí)行’,然后eternity掛起,而且要掛起3600秒,大于3,這時候出發(fā)TimeoutError。修改一下:‘’

import asyncio

async def eternity():
    print('我馬上開始執(zhí)行')
    await asyncio.sleep(2)  #當(dāng)前任務(wù)休眠2秒鐘,2<3
    print('終于輪到我了')

async def main():
    # Wait for at most 1 second
    try:
        print('等你3秒鐘哦')
        await asyncio.wait_for(eternity(), timeout=3)  #給你3秒鐘執(zhí)行你的任務(wù)
    except asyncio.TimeoutError:
        print('超時了!')

asyncio.run(main())

'''運(yùn)行結(jié)果為:
等你3秒鐘哦
我馬上開始執(zhí)行
終于輪到我了
'''


總結(jié):當(dāng)異步操作需要執(zhí)行的時間超過waitfor設(shè)置的timeout,就會觸發(fā)異常,所以在編寫程序的時候,如果要給異步操作設(shè)置timeout,一定要選擇合適,如果異步操作本身的耗時較長,而你設(shè)置的timeout太短,會涉及到她還沒做完,就拋出異常了。

(7)多個協(xié)程函數(shù)時候的等候

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
與上面的區(qū)別是,第一個參數(shù)aws是一個集合,要寫成集合set的形式,比如:
{func(),func(),func3()}
表示的是一系列的協(xié)程函數(shù)或者是任務(wù),其中協(xié)程會自動包裝成任務(wù)。事實(shí)上,寫成列表的形式也是可以的。

注意:該函數(shù)的返回值是兩個Tasks/Futures的集合:
(done, pending)
其中done是一個集合,表示已經(jīng)完成的任務(wù)tasks;pending也是一個集合,表示還沒有完成的任務(wù)。
常見的使用方法為:done, pending = await asyncio.wait(aws)

參數(shù)解釋:
timeout (a float or int), 同上面的含義一樣,需要注意的是,這個不會觸發(fā)asyncio.TimeoutError異常,如果到了timeout還有任務(wù)沒有執(zhí)行完,那些沒有執(zhí)行完的tasks和futures會被返回到第二個集合pending里面。
return_when參數(shù),顧名思義,他表示的是,什么時候wait函數(shù)該返回值。只能夠去下面的幾個值。

ConstantDescription
FIRST_COMPLETEDfirst_completes.當(dāng)任何一個task或者是future完成或者是取消,wait函數(shù)就返回
FIRST_EXCEPTION當(dāng)任何一個task或者是future觸發(fā)了某一個異常,就返回,.如果是所有的task和future都沒有觸發(fā)異常,則等價與下面的 ALL_COMPLETED.
ALL_COMPLETED當(dāng)所有的task或者是future都完成或者是都取消的時候,再返回。

如下面例子所示:

import asyncio
import time

a=time.time()

async def hello1():  #大約2秒
    print('Hello world 01 begin')
    yield from asyncio.sleep(2)
    print('Hello again 01 end')

async def hello2():  #大約3秒
    print('Hello world 02 begin')
    yield from asyncio.sleep(3)
    print('Hello again 02 end')

async def hello3():  #大約4秒
    print('Hello world 03 begin')
    yield from asyncio.sleep(4)
    print('Hello again 03 end')

async def main():   #入口函數(shù)
    done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED)
    for i in done:
        print(i)
    for j in pending:
        print(j)

asyncio.run(main()) #運(yùn)行入口函數(shù)

b=time.time()
print('---------------------------------------')
print(b-a)

'''運(yùn)行結(jié)果為:
Hello world 02 begin
Hello world 01 begin
Hello world 03 begin
Hello again 01 end
<Task finished coro=<hello1() done, defined at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:46> result=None>
<Task pending coro=<hello3() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394438>()]>>
<Task pending coro=<hello2() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394378>()]>>
---------------------------------------
2.033155679702759
'''

從上面可以看出,hello1()試運(yùn)行結(jié)束了的,hello2()和hello3()還沒結(jié)束。
(8)asyncio.as_completed()函數(shù)

這個函數(shù)我沒有找到合適的中文名稱去描述,所以哪位大神如果知道,望告知,不勝感激!它的函數(shù)原型如下:
asyncio.as_completed(aws, *, loop=None, timeout=None)
第一個參數(shù)aws:同上面一樣,是一個集合{}集合里面的元素是coroutine、task或者future
第三個參數(shù)timeout:意義和上面講的的一樣
那到底什么作用呢?其實(shí)很簡單,個人感覺有點(diǎn)雞肋,從一個例子看起:

import asyncio
import time
import threading

a=time.time()

@asyncio.coroutine
def hello1():
    print('Hello world 01 begin')
    yield from asyncio.sleep(5)  #大約5秒
    print('Hello again 01 end')
    return '哈哈1'

@asyncio.coroutine
def hello2():
    print('Hello world 02 begin')
    yield from asyncio.sleep(3#大約3秒
    print('Hello again 02 end')
    return '哈哈2'

@asyncio.coroutine
def hello3():
    print('Hello world 03 begin')
    yield from asyncio.sleep(4#大約4秒
    print('Hello again 03 end')
    return '哈哈3'

async def main():
    s=asyncio.as_completed({hello1(),hello2(),hello3()})
    for f in s:
        result=await f
        print(result)

asyncio.run(main())

b=time.time()
print('---------------------------------------')
print(b-a)

'''運(yùn)行結(jié)果為:
Hello world 03 begin
Hello world 01 begin
Hello world 02 begin
Hello again 01 end
哈哈1
Hello again 02 end
哈哈2
Hello again 03 end
哈哈3
---------------------------------------
4.0225794315338135
'''

結(jié)論:asyncio.as_completed()函數(shù)返回的是一個可迭代(iterator)的對象,對象的每個元素就是一個future對象,很多小伙伴說,這不是相當(dāng)于沒變嗎?其實(shí)返回的future集合是對參數(shù)的future集合重新組合,組合的順序就是,最先執(zhí)行完的協(xié)程函數(shù)(coroutine、task、future)最先返回,從上面的代碼可知,參數(shù)為
aws={hello1(),hello2(),hello3()},因?yàn)閔ello1大約花費(fèi)5秒、hello2大約花費(fèi)3秒、hello3大約花費(fèi)4秒。返回的結(jié)果為
s={hello2()、hello3()、hello(1)},因?yàn)閔ello2時間最短,故而放在前面,hello1時間最長,故而放在最后面。然后對返回的集合s開始迭代。


Task 類詳解


先來看一下Task類的簡單介紹(英文原文文檔)。

上面的文字描述中推出了幾個非常重要的信息,特在此總結(jié)如下

(1)他是作為一個python協(xié)程對象,和Future對象很像的這么一個對象,但不是線程安全的;他繼承了Future所有的API,,除了Future.set_result()和Future.set_Exception();

(2)使用高層API  asyncio.ccreate_task()創(chuàng)建任務(wù),或者是使用低層API loop.create_task()或者是loop.ensure_future()創(chuàng)建任務(wù)對象;

(3)相比于協(xié)程函數(shù),任務(wù)時有狀態(tài)的,可以使用Task.cancel()進(jìn)行取消,這會觸發(fā)CancelledError異常,使用cancelled()檢查是否取消。

下面介紹Task類常見的一些使用函數(shù)

(1)cancel()
Request the Task to be cancelled.

其實(shí)前面已經(jīng)有所介紹,最好是使用他會出發(fā)CancelledError異常,所以需要取消的協(xié)程函數(shù)里面的代碼最好在try-except語句塊中進(jìn)行,這樣方便觸發(fā)異常,打印相關(guān)信息,但是Task.cancel()沒有辦法保證任務(wù)一定會取消,而Future.cancel()是可以保證任務(wù)一定取消的。可以參見下面的一個例子:

import asyncio

async def cancel_me():
    print('cancel_me(): before sleep')
    try:
        await asyncio.sleep(3600#模擬一個耗時任務(wù)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    #通過協(xié)程創(chuàng)建一個任務(wù),需要注意的是,在創(chuàng)建任務(wù)的時候,就會跳入到異步開始執(zhí)行
    #因?yàn)槭?.7版本,創(chuàng)建一個任務(wù)就相當(dāng)于是運(yùn)行了異步函數(shù)cancel_me
    task = asyncio.create_task(cancel_me()) 
    #等待一秒鐘
    await asyncio.sleep(1)
    print('main函數(shù)休息完了')
    #發(fā)出取消任務(wù)的請求
    task.cancel()  
    try:
        await task  #因?yàn)槿蝿?wù)被取消,觸發(fā)了異常
    except asyncio.CancelledError:
        print('main(): cancel_me is cancelled now')

asyncio.run(main())

'''運(yùn)行結(jié)果為:
cancel_me(): before sleep
main函數(shù)休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''

運(yùn)行過程分析:

首先run函數(shù)啟動主函數(shù)入口main,在main中,因?yàn)榈谝痪湓捑褪钦{(diào)用異步函數(shù)cancel_me()函數(shù),所以先打印出了第一句話;
然后進(jìn)入cancel_me中的try語句,遇到await,暫停,這時候返回main中執(zhí)行,但是有在main中遇到了await,也會暫停,但是由于main中只需要暫停1秒,而cancel_me中要暫停3600秒,所以等到main的暫停結(jié)束后,接著運(yùn)行main,所以打印出第二句話;
接下來遇到取消任務(wù)的請求task.cancel(),然后繼續(xù)執(zhí)行main里面的try,又遇到了await,接著main進(jìn)入暫停,接下來進(jìn)入到cancel_me函數(shù)中,但是由于main中請求了取消任務(wù),所以那個耗時3600秒的任務(wù)就不再執(zhí)行了,直接觸發(fā)了Cancelled_Error異常,打印出第三句話,接下來又raise一個異常信息;
接下來執(zhí)行cancel_me的finally,打印出第四句話,此時cancel_me執(zhí)行完畢,由于他拋出了一個異常,返回到主程序main中,觸發(fā)異常,打印出第五句話。

(2)done()
當(dāng)一個被包裝得協(xié)程既沒有觸發(fā)異常、也沒有被取消的時候,意味著它是done的,返回true。

(3)result()
返回任務(wù)的執(zhí)行結(jié)果,
當(dāng)任務(wù)被正常執(zhí)行完畢,則返回結(jié)果;
當(dāng)任務(wù)被取消了,調(diào)用這個方法,會觸發(fā)CancelledError異常;
當(dāng)任務(wù)返回的結(jié)果是無用的時候,則調(diào)用這個方法會觸發(fā)InvalidStateError;
當(dāng)任務(wù)出發(fā)了一個異常而中斷,調(diào)用這個方法還會再次觸發(fā)這個使程序中斷的異常。

(4)exception()

返回任務(wù)的異常信息,觸發(fā)了什么異常,就返回什么異常,如果任務(wù)是正常執(zhí)行的無異常,則返回None;
當(dāng)任務(wù)被取消了,調(diào)用這個方法會觸發(fā)CancelledError異常;
當(dāng)任務(wù)沒有做完,調(diào)用這個方法會觸發(fā)InvalidStateError異常。
下面還有一些不常用的方法,如下:

(5)add_done_callback(callback, *, context=None)
(6)remove_done_callback(callback)
(7)get_stack(*, limit=None)
(8)print_stack(*, limit=None, file=None)
(9)all_tasks(loop=None),這是一個類方法
(10)current_task(loop=None),這是一個類方法


異步函數(shù)的結(jié)果獲取


對于異步編程、異步函數(shù)而言,最重要的就是異步函數(shù)調(diào)用結(jié)束之后,獲取異步函數(shù)的返回值,我們可以有以下幾種方式來獲取函數(shù)的返回值,第一是直接通過Task.result()來獲??;第二種是綁定一個回調(diào)函數(shù)來獲取,即函數(shù)執(zhí)行完畢后調(diào)用一個函數(shù)來獲取異步函數(shù)的返回值。

(1)直接通過result來獲取

import asyncio
import time


async def hello1(a,b):
    print('Hello world 01 begin')
    await asyncio.sleep(3)  #模擬耗時任務(wù)3秒
    print('Hello again 01 end')
    return a+b

coroutine=hello1(10,5)
loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
task=asyncio.ensure_future(coroutine)         #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表
loop.run_until_complete(task)                  #第三步:通過事件循環(huán)運(yùn)行
print('-------------------------------------')
print(task.result())
loop.close() 

'''運(yùn)行結(jié)果為
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''

(2)通過定義回調(diào)函數(shù)來獲取

import asyncio
import time


async def hello1(a,b):
    print('Hello world 01 begin')
    await asyncio.sleep(3)  #模擬耗時任務(wù)3秒
    print('Hello again 01 end')
    return a+b

def callback(future):   #定義的回調(diào)函數(shù)
    print(future.result())

loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
task=asyncio.ensure_future(hello1(10,5))       #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)
task.add_done_callback(callback)                      #并被任務(wù)綁定一個回調(diào)函數(shù)

loop.run_until_complete(task)                  #第三步:通過事件循環(huán)運(yùn)行
loop.close()                                   #第四步:關(guān)閉事件循環(huán)


'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello again 01 end
15
'''

注意:所謂的回調(diào)函數(shù),就是指協(xié)程函數(shù)coroutine執(zhí)行結(jié)束時候會調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果。我們創(chuàng)建的task和回調(diào)里的future對象,實(shí)際上是同一個對象,因?yàn)閠ask是future的子類。




03

asyncio異步編程的基本模板

事實(shí)上,在使用asyncio進(jìn)行異步編程的時候,語法形式往往是多樣性的,雖然理解異步編程的核心思想很重要,但是實(shí)現(xiàn)的時候終究還是要編寫語句的,本次給出的模板,是兩個不同的例子,例子一是三個異步方法,它們都沒有參數(shù),沒有返回值,都模擬一個耗時任務(wù);例子二是三個異步方法,都有參數(shù),都有返回值。


python3.7之前的版本


例子一:無參數(shù)、無返回值
import asyncio
import time

a=time.time()

async def hello1():
    print('Hello world 01 begin')
    await asyncio.sleep(3)  #模擬耗時任務(wù)3秒
    print('Hello again 01 end')

async def hello2():
    print('Hello world 02 begin')
    await asyncio.sleep(2)   #模擬耗時任務(wù)2秒
    print('Hello again 02 end')

async def hello3():
    print('Hello world 03 begin')
    await asyncio.sleep(4)   #模擬耗時任務(wù)4秒
    print('Hello again 03 end')

loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
tasks = [hello1(), hello2(),hello3()]          #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表
loop.run_until_complete(asyncio.wait(tasks))   #第三步:通過事件循環(huán)運(yùn)行
loop.close()                                   #第四步:取消事件循環(huán)

'''運(yùn)行結(jié)果為:
Hello world 02 begin
Hello world 03 begin
Hello world 01 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
'''


例子二:有參數(shù)、有返回值

import asyncio
import time


async def hello1(a,b):
    print('Hello world 01 begin')
    await asyncio.sleep(3)  #模擬耗時任務(wù)3秒
    print('Hello again 01 end')
    return a+b

async def hello2(a,b):
    print('Hello world 02 begin')
    await asyncio.sleep(2)   #模擬耗時任務(wù)2秒
    print('Hello again 02 end')
    return a-b

async def hello3(a,b):
    print('Hello world 03 begin')
    await asyncio.sleep(4)   #模擬耗時任務(wù)4秒
    print('Hello again 03 end')
    return a*b

loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
tasks = [task1,task2,task3]                    #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表
loop.run_until_complete(asyncio.wait(tasks))   #第三步:通過事件循環(huán)運(yùn)行
print(task1.result())                               #并且在所有的任務(wù)完成之后,獲取異步函數(shù)的返回值   
print(task2.result())
print(task3.result())
loop.close()                                   #第四步:關(guān)閉事件循環(huán)

'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''



總結(jié):四步走(針對python3.7之前的版本)

第一步·:構(gòu)造事件循環(huán)

loop=asyncio.get_running_loop() #返回(獲?。┰诋?dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會顯示錯誤;它是python3.7中新添加的

loop=asyncio.get_event_loop() #獲得一個事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個新的事件循環(huán)loop;

loop=asyncio.set_event_loop(loop) #設(shè)置一個事件循環(huán)為當(dāng)前線程的事件循環(huán);

loop=asyncio.new_event_loop()  #創(chuàng)建一個新的事件循環(huán)

第二步:將一個或者是多個協(xié)程函數(shù)包裝成任務(wù)Task

#高層API
task = asyncio.create_task(coro(參數(shù)列表))   # 這是3.7版本新添加的
task = asyncio.ensure_future(coro(參數(shù)列表)) 

#低層API
loop.create_future(coro)
loop.create_task(coro)

'''需要注意的是,在使用Task.result()獲取協(xié)程函數(shù)結(jié)果的時候,使用asyncio.create_task()卻會顯示錯
但是使用asyncio.ensure_future卻正確,本人暫時不知道原因,哪位大神知道,望告知,不勝感激!'''

第三步:通過事件循環(huán)運(yùn)行

loop.run_until_complete(asyncio.wait(tasks))  #通過asyncio.wait()整合多個task

loop.run_until_complete(asyncio.gather(tasks))  #通過asyncio.gather()整合多個task

loop.run_until_complete(task_1)  #單個任務(wù)則不需要整合

loop.run_forever()  #但是這個方法在新版本已經(jīng)取消,不再推薦使用,因?yàn)槭褂闷饋聿缓啙?/span>

'''
使用gather或者wait可以同時注冊多個任務(wù),實(shí)現(xiàn)并發(fā),但他們的設(shè)計是完全不一樣的,在前面的2.1.(4)中已經(jīng)討論過了,主要區(qū)別如下:
(1)參數(shù)形式不一樣
gather的參數(shù)為 *coroutines_or_futures,即如這種形式
      tasks = asyncio.gather(*[task1,task2,task3])或者
      tasks = asyncio.gather(task1,task2,task3)
      loop.run_until_complete(tasks)
wait的參數(shù)為列表或者集合的形式,如下
      tasks = asyncio.wait([task1,task2,task3])
      loop.run_until_complete(tasks)
(2)返回的值不一樣
gather的定義如下,gather返回的是每一個任務(wù)運(yùn)行的結(jié)果,
      results = await asyncio.gather(*tasks) 
wait的定義如下,返回dones是已經(jīng)完成的任務(wù),pending是未完成的任務(wù),都是集合類型
 done, pending = yield from asyncio.wait(fs)
(3)后面還會講到他們的進(jìn)一步使用
'''

簡單來說:async.wait會返回兩個值:done和pending,done為已完成的協(xié)程Task,pending為超時未完成的協(xié)程Task,需通過future.result調(diào)用Task的result。而async.gather返回的是已完成Task的result。

第四步:關(guān)閉事件循環(huán)

loop.close()

'''
以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢?
簡單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行:
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
但是如果關(guān)閉了,就不能再運(yùn)行了:
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))  # 此處異常
建議調(diào)用 loop.close,以徹底清理 loop 對象防止誤用
'''



python3.7版本


在最新的python3.7版本中,asyncio又引進(jìn)了一些新的特性和API


例子一:無參數(shù)、無返回值
import asyncio
import time


async def hello1():
    print('Hello world 01 begin')
    await asyncio.sleep(3)  #模擬耗時任務(wù)3秒
    print('Hello again 01 end')

async def hello2():
    print('Hello world 02 begin')
    await asyncio.sleep(2)   #模擬耗時任務(wù)2秒
    print('Hello again 02 end')

async def hello3():
    print('Hello world 03 begin')
    await asyncio.sleep(4)   #模擬耗時任務(wù)4秒
    print('Hello again 03 end')

async def main():
    results=await asyncio.gather(hello1(),hello2(),hello3())
    for result in results:
        print(result)     #因?yàn)闆]返回值,故而返回None

asyncio.run(main())

'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
None
None
None
'''


例子二:有參數(shù)、有返回值

import asyncio
import time


async def hello1(a,b):
    print('Hello world 01 begin')
    await asyncio.sleep(3)  #模擬耗時任務(wù)3秒
    print('Hello again 01 end')
    return a+b

async def hello2(a,b):
    print('Hello world 02 begin')
    await asyncio.sleep(2)   #模擬耗時任務(wù)2秒
    print('Hello again 02 end')
    return a-b

async def hello3(a,b):
    print('Hello world 03 begin')
    await asyncio.sleep(4)   #模擬耗時任務(wù)4秒
    print('Hello again 03 end')
    return a*b

async def main():
    results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
    for result in results:
        print(result)

asyncio.run(main())

'''運(yùn)行結(jié)果為:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''


總結(jié):兩步走(針對python3.7)

第一步:構(gòu)建一個入口函數(shù)main
他也是一個異步協(xié)程函數(shù),即通過async定義,并且要在main函數(shù)里面await一個或者是多個協(xié)程,同前面一樣,我可以通過gather或者是wait進(jìn)行組合,對于有返回值的協(xié)程函數(shù),一般就在main里面進(jìn)行結(jié)果的獲取。

第二步:啟動主函數(shù)main
這是python3.7新添加的函數(shù),就一句話,即
asyncio.run(main())

注意:
不再需要顯式的創(chuàng)建事件循環(huán),因?yàn)樵趩觬un函數(shù)的時候,就會自動創(chuàng)建一個新的事件循環(huán)。而且在main中也不需要通過事件循環(huán)去掉用被包裝的協(xié)程函數(shù),只需要向普通函數(shù)那樣調(diào)用即可 ,只不過使用了await關(guān)鍵字而已。



04

協(xié)程編程的優(yōu)點(diǎn)


1、無cpu分時切換線程保存上下文問題(協(xié)程上下文怎么保存)

2、遇到io阻塞切換(怎么實(shí)現(xiàn)的)

3、無需共享數(shù)據(jù)的保護(hù)鎖(為什么)

4、系列文章下篇預(yù)告——介紹低層的API,事件循環(huán)到底是怎么實(shí)現(xiàn)的以及future類的實(shí)現(xiàn)。


本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
第101天: Python asyncio
如何精確控制 asyncio 中并發(fā)運(yùn)行的多個任務(wù)
Python黑魔法 --- 異步IO( asyncio) 協(xié)程
python 異步 async/await -1.一文理解什么是協(xié)程
Python并發(fā)編程從入門到進(jìn)階
如何使用Python異步編程進(jìn)行API調(diào)用 | 區(qū)塊鏈研究實(shí)驗(yàn)室
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服