免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
【Swoole系列4.4】協(xié)程間通信Channel及WithGroup

協(xié)程間通信Channel及WithGroup

在進程篇的學(xué)習中,我們花過很大的篇幅講過進程間的通信問題。但是在協(xié)程中,這個問題其實并不是很重要,為什么呢?因為從基礎(chǔ)的理論我們就知道,協(xié)程是基于線程的,而線程在同一個進程中是共享內(nèi)存的,通信其實并不會有太大的問題。而進程因為有隔離問題的存在,所以進程之間的通信我們就講了很多。關(guān)于協(xié)程的通信,Swoole 直接就提供了一個 Channel 功能來幫助我們實現(xiàn)。

Channel

Channel ,其實可以理解為就是一個消息隊列,只不過它是協(xié)程間的消息隊列,多個協(xié)程可以通過 push 和 pop 操作來生產(chǎn)和消費消息。Channel 是基于協(xié)程的,所以說它是沒有辦法跨進程使用的,我們后面要講的 并發(fā)調(diào)用 和 連接池 都是基于 Channel 的。

Channel 支持多生產(chǎn)者協(xié)程和多消費者協(xié)程,底層自動實現(xiàn)了協(xié)程的切換和調(diào)度。它與 Array 很類似,僅占用內(nèi)存,沒有額外別的資源申請,因此也就沒有 IO 消耗,效率速度可想而知。它在底層使用 PHP 的引用計數(shù)實現(xiàn),沒有內(nèi)存的拷貝問題,巨大字符和數(shù)組也不會產(chǎn)生額外的消耗,也是零拷貝技術(shù)的實現(xiàn)。

所謂零拷貝就是說,傳統(tǒng)的 IO 標準操作會在系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和應(yīng)用程序地址空間定義的緩沖區(qū)之間進行傳輸,效率提升了但傳輸過程中數(shù)據(jù)需要在緩沖區(qū)進行拷貝復(fù)制。而零拷貝則避免了不同存儲塊之間的拷貝,能夠更加有效利用系統(tǒng)資源,極大提升性能。

最后匯總一句話,Channel 性能 diao 炸天。而且,Channel 在 Go 語言中是非常非常重要的一個能力,掌握好 Swoole 中的 Channel ,再學(xué)習 Go ,或者之前已經(jīng)學(xué)習過 Go ,再來看這里的話,都會非常親切。對于協(xié)程編程,Channel 一定要牢牢掌握。

\Swoole\Coroutine\run(function(){
    $channel = new \Swoole\Coroutine\Channel(1);

    go(function() use ($channel){
        for($i = 0; $i < 3; $i++) {
            $channel->push(['rand' => rand(10009999), 'index' => $i]);
            echo "{$i}\n";
        }
        $channel->close();
    });

    go(function() use($channel){
        while(1){
            co::sleep(1);
            $data = $channel->pop();
            if($channel->errCode == SWOOLE_CHANNEL_CLOSED){
                break;
            }
            var_dump($data);
        }
    });
});

//0
//1
//array(2) {
//    ["rand"]=>
//  int(8020)
//  ["index"]=>
//  int(0)
//}
//2
//array(2) {
//    ["rand"]=>
//  int(5072)
//  ["index"]=>
//  int(1)
//}
//array(2) {
//    ["rand"]=>
//  int(5950)
//  ["index"]=>
//  int(2)
//}


使用也非常簡單,上面的代碼即使我不說你也應(yīng)該能看明白。首先實例化一個 \Swoole\Coroutine\Channel 對象,它的構(gòu)造參數(shù)是設(shè)置隊列的容量,比如我們設(shè)置為 1 則隊列只能有一條數(shù)據(jù)。當這一條數(shù)據(jù)沒有被消費的時候,后續(xù)的 push() 會掛起并等待隊列有空間繼續(xù)存放數(shù)據(jù)。同理,pop() 也會在隊列為空的時候掛起等待,它的參數(shù)就是等待時間,超過時間了則會結(jié)束,默認情況下是 -1 ,表示會一直等待。

當我們操作完添加的協(xié)程之后,調(diào)用 close() 關(guān)閉隊列,然后在消費端查看 Channel 隊列是否已經(jīng)關(guān)閉,如果關(guān)閉了就退出循環(huán),最終程序執(zhí)行結(jié)束。

具體的調(diào)用順序我們從注釋中來看,首先是打印的 0 和 1 ,貌似一次塞進了兩條數(shù)據(jù),但其實我們的 Channel 容量只有 1 ,只是說協(xié)程2已經(jīng)消費了,但在打印出來效果上看卻是 1 先輸出了,這是并發(fā)執(zhí)行的特點,如果同時進行,出現(xiàn)的順序是不一定的。消費了一條數(shù)據(jù)之后第一個協(xié)程又打印出來了 2 ,這時隊列添加操作結(jié)束,調(diào)用 close() 關(guān)閉隊列。之后第二個協(xié)程會繼續(xù)消費完隊列。想看清楚的話,可以在第二個協(xié)程的 while 循環(huán)中加一個 co::sleep(1); 看看效果。

官網(wǎng)給出的例子是另一種形式,根據(jù) pop 過期時間的。

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;

run(function(){
    $channel = new Channel(1);
    Coroutine::create(function () use ($channel) {
        for($i = 0; $i < 10; $i++) {
            Coroutine::sleep(1.0);
            $channel->push(['rand' => rand(1000, 9999), 'index' => $i]);
            echo "{$i}\n";
        }
    });
    Coroutine::create(function () use ($channel) {
        while(1) {
            $data = $channel->pop(2.0);
            if ($data) {
                var_dump($data);
            } else {
                assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);
                break;
            }
        }
    });
});

總共就這么點東西,是不是很簡單(簡單個毛線)。push() 和 pop() 直接在底層就為我們實現(xiàn)了掛起和執(zhí)行的調(diào)度操作。其實大家也能猜到,內(nèi)部同樣是 yield() 和 resume() 的來回切換操作。更具體的應(yīng)用大家可以學(xué)習一下 Go 語言中的 Channel ,前面也說過了在 Go 中主要就是依靠 Channel 來進行協(xié)程管理,非常強大。

多協(xié)程生產(chǎn)消費及其它方法屬性

接下來我們直接在一個例子中演示多協(xié)程操作 Channel 以及 Channel 的一些其它的相關(guān)方法和屬性。

\Swoole\Coroutine\run(function () {
    $channel = new chan(2);
    $chan2 = new chan(2); // 總控,有兩個生產(chǎn)者,兩個消費者,要知道何時關(guān)閉 $channel


    go(function () use ($channel, $chan2) {
        for ($i = 0; $i < 3; $i++) {
            co::sleep(rand(1,2));
            $channel->push(['rand' => rand(10009999), 'index' => $i]);
            echo "入 channel 隊協(xié)程:" . co::getCid() . ",下標:{$i}\n";
        }
        echo "入 chan2 隊協(xié)程:" . co::getCid();
        $chan2->push(1);
    });
    go(function () use ($channel, $chan2) {
        for ($i = 1; $i < 4; $i++) {
            co::sleep(rand(1,2));
            $channel->push(['rand' => rand(10009999), 'index' => $i * 10]);
            echo "入 channel 隊協(xié)程:" . co::getCid() . ",下標:{$i}\n";
        }
        echo "入 chan2 隊協(xié)程:" . co::getCid();
        $chan2->push(1);
    });

    echo "================", PHP_EOL;
    var_dump($channel->stats());
    var_dump($channel->length());
    var_dump($channel->isEmpty());
    var_dump($channel->isFull());
    var_dump($channel->capacity);
    var_dump($channel->errCode);
    echo "================", PHP_EOL;

    go(function () use ($channel) {
        while (1) {
            co::sleep(rand(2,3));
            if ($channel->errCode == SWOOLE_CHANNEL_CLOSED) {
                break;
            }
            $data = $channel->pop();
            if($data == false){
                break;
            }
            echo "%%%%%\n";
            echo " cid:", co::getCid(), "消費 channel !\n";
            var_dump($data);
            echo "%%%%%\n";
        }
    });

    go(function () use ($channel) {
        while (1) {
            co::sleep(rand(2,3));
            if ($channel->errCode == SWOOLE_CHANNEL_CLOSED) {
                break;
            }
            $data = $channel->pop();
            if($data == false){
                break;
            }
            echo "%%%%%\n";
            echo " cid:", co::getCid(), "消費 channel !\n";
            var_dump($data);
            echo "%%%%%\n";
        }
    });

    for ($i = $chan2->capacity; $i > 0; $i--) {
        $chan2->pop();
        echo " 主線程消費 chan2: {$i} !\n";
    }
    $channel->close();

});
//================
//array(3) {
//    ["consumer_num"]=>
//  int(0)
//  ["producer_num"]=>
//  int(0)
//  ["queue_num"]=>
//  int(0)
//}
//int(0)
//bool(true)
//bool(false)
//int(2)
//int(0)
//================
//入 channel 隊協(xié)程:2,下標:0
//入 channel 隊協(xié)程:3,下標:1
//入 channel 隊協(xié)程:3,下標:2
//%%%%%
// cid:5消費 channel !
//array(2) {
//    ["rand"]=>
//  int(2792)
//  ["index"]=>
//  int(0)
//}
//%%%%%
//%%%%%
// cid:4消費 channel !
//array(2) {
//    ["rand"]=>
//  int(7298)
//  ["index"]=>
//  int(10)
//}
//%%%%%
//入 channel 隊協(xié)程:2,下標:1
//入 channel 隊協(xié)程:2,下標:2
//入 chan2 隊協(xié)程:2 主線程消費 chan2: 2 !
//%%%%%
// cid:4消費 channel !
//array(2) {
//    ["rand"]=>
//  int(3729)
//  ["index"]=>
//  int(20)
//}
//%%%%%
//入 channel 隊協(xié)程:3,下標:3
//入 chan2 隊協(xié)程:3 主線程消費 chan2: 1 !
//%%%%%
// cid:5消費 channel !
//array(2) {
//    ["rand"]=>
//  int(3590)
//  ["index"]=>
//  int(1)
//}
//%%%%%
//%%%%%
// cid:4消費 channel !
//array(2) {
//    ["rand"]=>
//  int(2667)
//  ["index"]=>
//  int(2)
//}
//%%%%%
//%%%%%
// cid:5消費 channel !
//array(2) {
//    ["rand"]=>
//  int(3430)
//  ["index"]=>
//  int(30)
//}
//%%%%%

內(nèi)容比較多,也復(fù)雜了很多,我們一塊一塊來看。

現(xiàn)在的情況是,我們有四個協(xié)程,兩個生產(chǎn),兩個消費,那么問題來了,我們怎么知道生產(chǎn)者生產(chǎn)完了呢?也就是說,我們怎么知道 $channel 應(yīng)該在什么時候 close() 呢?畢竟有兩個協(xié)程同時在向同一個 Channels 中 push 數(shù)據(jù)啊。

這里也是從 Go 那邊找的一個例子。我們可以再建一個 Channels ,就叫做 $chan2 ,然后在主進程中循環(huán) pop 它,并在兩個生產(chǎn)者協(xié)程中 push 數(shù)據(jù)。當兩個生產(chǎn)者協(xié)程向 $channel 添加完成之后,外面主進程的循環(huán) pop 才會結(jié)束,我們就關(guān)閉 $channel ,兩個消費的協(xié)程也就結(jié)束了。

在這個幾個例子中,加了很多 co::sleep() ,為的是可以方便地看出協(xié)程交替執(zhí)行的效果,實際工作中是不用加的,因為我們這是例子,沒有什么耗時操作,執(zhí)行太快了,看不出多個協(xié)程一起工作的效果。而在實際工作中,可能有各種 IO 情況導(dǎo)致處理時間會有不同,效果就會比較明顯。

另外我們使用了 chan() 這個語法糖實例化了一個 Channel 對象,它和 \Swoole\Coroutine\Channel 對象是一樣的。同時我們還打印了一下 Channel 對象的一些相關(guān)方法屬性,大家可以看一下。

stats() 返回隊列信息,主要包括下面這些內(nèi)容:

  • consumer_num 消費者數(shù)量,表示當前通道為空,有 N 個協(xié)程正在等待其他協(xié)程調(diào)用 push 方法生產(chǎn)數(shù)據(jù)

  • producer_num 生產(chǎn)者數(shù)量,表示當前通道已滿,有 N 個協(xié)程正在等待其他協(xié)程調(diào)用 pop 方法消費數(shù)據(jù)

  • queue_num 通道中的元素數(shù)量

很明顯,我們的 producer_num 和 queue_num 都會是 2 ,目前隊列是滿隊的狀態(tài),因為主進程中打印的,所有的協(xié)程還在休眠狀態(tài),所以看不到什么東西,大家可以嘗試注釋掉生產(chǎn)者里面的休眠代碼,就可以看到主進程打印的相關(guān)信息了。

length() 表示的就是 queue_num 的信息,也就是隊列的元素數(shù)量或者說是隊列長度。isEmpty() 表示隊列是否為空,isFull() 表示隊列是否已滿。capacity 屬性就是我們在構(gòu)造函數(shù)設(shè)置的隊列長度,errCode 表示當前的錯誤信息,我們已經(jīng)用過這個了。

好吧,我承認,我看得也暈,其實最好的方式是大家直接去學(xué)一下 Go ,然后對比著那邊的 Channel 一起來看。

WaitGroup

基于 Channel 可以實現(xiàn)很多功能,緊接著我們就來講一個另一個比較重要的功能,那就是協(xié)程的 WaitGroup 功能。

如果你學(xué)過 Go 語言,那么 sync.WaitGroup 應(yīng)該不會陌生。如果你沒學(xué)過,也不用擔心。先看代碼,再來解釋。

\Swoole\Coroutine\run(function(){
   $wg = new \Swoole\Coroutine\WaitGroup();

   $wg->add();
   $wg->add();

   go(function() use($wg){
       echo "協(xié)程1,cid:" . Co::getCid() , " start", PHP_EOL;
       sleep(1);
       echo "協(xié)程1,cid:" . Co::getCid() , " end", PHP_EOL;
       $wg->done();
   });

    go(function()use($wg){
        echo "協(xié)程2,cid:" . Co::getCid() , " start", PHP_EOL;
        sleep(2);
        echo "協(xié)程2,cid:" . Co::getCid() , " end", PHP_EOL;
        $wg->done();
    });

    $wg->wait(); // wait1

    echo "繼續(xù)執(zhí)行",PHP_EOL;

    $wg->add();
    go(function()use($wg){
        echo "協(xié)程3,cid:" . Co::getCid() , " start", PHP_EOL;
        sleep(3);
        echo "協(xié)程3,cid:" . Co::getCid() , " end", PHP_EOL;
        $wg->done();
    });
    $wg->wait();

});

//協(xié)程1,cid:2 start
//協(xié)程2,cid:3 start
//協(xié)程1,cid:2 end
//協(xié)程2,cid:3 end
//繼續(xù)執(zhí)行
//協(xié)程3,cid:4 start
//協(xié)程3,cid:4 end

直接看代碼和運行結(jié)果,你能猜到這是在干嘛嗎?好吧,我其實也沒學(xué)完 Go ,但是咱們 PHPer 對前端還是了解一些的吧,async await 了解過不?或者說 Promise 總聽說過吧。這里其實就和 JS 的 Promise 的效果很像。

我們首先實例化一個 \Swoole\Coroutine\WaitGroup 對象。然后通過 add() 方法添加引用計數(shù)。因為要實現(xiàn)兩個協(xié)程,我們就需要添加兩個引用計數(shù)。然后在協(xié)程內(nèi)部,通過 done() 方法標明這個協(xié)程執(zhí)行完了,或者說也可以認為它會將引用計數(shù)減少。

然后在外部調(diào)用 wait() 的時候,會等待引用計數(shù)歸零,才會繼續(xù)執(zhí)行后面的代碼。

再說得通俗點,如果沒有 wait() 那么這兩個協(xié)程執(zhí)行之后,后面的代碼也會緊跟著執(zhí)行,馬上就會輸入 “繼續(xù)執(zhí)行” 四個字,并且后面的協(xié)程3也會開始運行。但是,有中間的那個 wait() 的話,整個協(xié)程容器就會等待前面兩個協(xié)程完成執(zhí)行之后,也就是 done() 完了,才會繼續(xù)執(zhí)行后面的代碼。

你看,這是不是真的非常像 Promise 的功能。JS 中引入這個功能是為了解決什么問題呢?那就是異步執(zhí)行同步返回的功能。在前端頁面上,多個 Ajax 請求同時發(fā)出,返回時間是不確定的,而我們的前端業(yè)務(wù)可能是需要所有的請求都返回結(jié)果之后,才能進行后續(xù)的操作,這時候就可以用 Promise 來實現(xiàn)這樣的功能了。WithGroup 也是同樣的概念。

上面的例子中,輸出的結(jié)果很清晰的就能看出來,協(xié)程1和協(xié)程2都end之后,才打印了“繼續(xù)執(zhí)行”,并開始執(zhí)行協(xié)程3。假如你注釋掉中間的那個 wait() ,也就是注釋了 wait1 的那個,那么輸出的結(jié)果就會是這樣的。

// 注釋中間 wait
//協(xié)程1,cid:2 start
//協(xié)程2,cid:3 start
//繼續(xù)執(zhí)行
//協(xié)程3,cid:4 start
//協(xié)程1,cid:2 end
//協(xié)程2,cid:3 end
//協(xié)程3,cid:4 end

能看出來不同吧。如果 add() 和 done() 的數(shù)量不一樣,都會報錯,因此這兩個方法是成對出現(xiàn)了,有一個 add() 就要有一個 done() 去清理計數(shù)。wait() 方法中其實是有一個 Channel 的 pop() 在等待阻塞,當引用計數(shù)不為 0 的時候,這個 pop() 就一直阻塞著,而當計數(shù)為 0 后,就直接返回。

為什么我知道它的實現(xiàn)呢?WithGroup 組件是純 PHP 代碼實現(xiàn)的,也是包含在協(xié)程的 Library 工具包中的,大家可以自己去看它的源碼,真的就是通過 Channel 實現(xiàn)的,而且非常簡單好懂,Github 地址在文末。

包括 Go 語言,其實也更推薦的是通過 Channel 來進行協(xié)程管理的,因此,咱們在 Swoole 中,也盡量多使用 Channel 吧,畢竟萬一將來要學(xué) Go 呢?這不就水到渠成了嘛!

總結(jié)

今天學(xué)習的內(nèi)容非常重要,它是我們后面要學(xué)習的 并發(fā)調(diào)用 和 連接池 的基礎(chǔ)。所幸的是,這兩個東西理解難度不是那么大,但確實還是有一定的難度,如果你現(xiàn)在正在學(xué) Go ,建議一起看,一起學(xué),效果更好哦。

Channel 就是一個協(xié)程間可以共享通信的隊列系統(tǒng),非常類似于我們進程中的隊列消息通信。而 WithGroup 則是實現(xiàn)了一個類似于 Promise 的功能,實現(xiàn)異步并發(fā)同步返回的效果,它的底層實際上還是 Channel 。

測試代碼:

https://github.com/zhangyue0503/swoole/blob/main/4.Swoole%E5%8D%8F%E7%A8%8B/source/4.4%E5%8D%8F%E7%A8%8B%E9%97%B4%E9%80%9A%E4%BF%A1Channel%E5%8F%8AWithGroup.php

參考文檔:

https://wiki.swoole.com/#/coroutine/channel

https://wiki.swoole.com/#/coroutine/wait_group

WithGroup源碼:https://github.com/swoole/library/blob/master/src/core/Coroutine/WaitGroup.php

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
C++ 協(xié)程與網(wǎng)絡(luò)編程 協(xié)程
解決CPU嚴重消耗的問題
Python并發(fā)編程協(xié)程(Coroutine)之Gevent
我自然 | Go
云風的 BLOG: skynet 的 UDP 支持
C#代替go采用的CSP并發(fā)模型實現(xiàn)
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服