免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
DotNetty發(fā)送請求的最佳實(shí)踐

長鏈接發(fā)送request/response時(shí), 絕大部分包都是小包, 而每個(gè)小包都要消耗一個(gè)IP包, 成本大約是20-30us, 普通千兆網(wǎng)卡的pps大約是60Wpps, 所以想要提高長鏈接密集IO的應(yīng)用性能, 需要做包的合并, 也稱為了scatter/gather io或者vector io.

 

在linux下有readv/writev就是對應(yīng)這個(gè)需求的, 減少系統(tǒng)調(diào)用, 減少pps, 提高網(wǎng)卡的吞吐量. 關(guān)于readv提高讀的速度, 可以看看陳碩muduo里面對于readv的使用, 思路是就是在棧上面弄一個(gè)64KB的數(shù)組, 組成readv的第二塊buffer, 從而盡可能一次性把socket緩沖區(qū)的內(nèi)容全部出來(參見5). 這里不再贅述, 重點(diǎn)描述DotNetty下面怎么做Gathering Write.

 

首先得有一個(gè)Channel<IMessage>, 用來做寫的緩沖, 讓業(yè)務(wù)關(guān)心業(yè)務(wù), 網(wǎng)絡(luò)關(guān)心網(wǎng)絡(luò), 否則每個(gè)業(yè)務(wù)都WriteAndFlushAsync, 那是不太可能有合并發(fā)送的.

 

然后就是SendingLoop的主循環(huán), 里面不停的從Channel里面TryRead包, 然后WriteAsync, 隔幾個(gè)包Flush一次. 類似的思想在Orleans Network里面也存在.

 

 1 public void RunSendLoopAsync(IChannel channel) 2 { 3     var allocator = channel.Allocator; 4     var reader = this.queue.Reader; 5     Task.Run(async () =>  6     { 7         while (!this.stop)  8         { 9             var more = await reader.WaitToReadAsync();10             if (!more) 11             {12                 break;13             }14 15             IOutboundMessage message = default;16             var number = 0;17             try 18             {19                 while (number < 4 && reader.TryRead(out message) && message != null) 20                 {21                     Interlocked.Decrement(ref this.queueCount);22                     var msg = message.Inner as IMessage;23                     var buffer = msg.ToByteBuffer(allocator);24                     channel.WriteAsync(buffer);25                     number++;26                 }27                 channel.Flush();28                 number = 0;29             }30             catch (Exception e)  when(message != default)31             {32                 logger.LogError("SendOutboundMessage Fail, SessionID:{0}, Exception:{1}",33                     this.sessionID, e.Message);34                 this.messageCenter.OnMessageFail(message);35             }36         }37         this.logger.LogInformation("SessionID:{0}, SendingLoop Exit", this.sessionID);38     });39 }

第19-27行是關(guān)鍵, 這邊每4個(gè)包做一下flush, 然后flush會(huì)觸發(fā)DotNetty的DoWrite:

 1 protected override void DoWrite(ChannelOutboundBuffer input) 2 { 3     List<ArraySegment<byte>> sharedBufferList = null; 4     try 5     { 6         while (true) 7         { 8             int size = input.Size; 9             if (size == 0)10             {11                 // All written12                 break;13             }14             long writtenBytes = 0;15             bool done = false;16 17             // Ensure the pending writes are made of ByteBufs only.18             int maxBytesPerGatheringWrite = ((TcpSocketChannelConfig)this.config).GetMaxBytesPerGatheringWrite();19             sharedBufferList = input.GetSharedBufferList(1024, maxBytesPerGatheringWrite);20             int nioBufferCnt = sharedBufferList.Count;21             long expectedWrittenBytes = input.NioBufferSize;22             Socket socket = this.Socket;23 24             List<ArraySegment<byte>> bufferList = sharedBufferList;25             // Always us nioBuffers() to workaround data-corruption.26             // See https://github.com/netty/netty/issues/276127             switch (nioBufferCnt)28             {29                 case 0:30                     // We have something else beside ByteBuffers to write so fallback to normal writes.31                     base.DoWrite(input);32                     return;33                 default:34                     for (int i = this.Configuration.WriteSpinCount - 1; i >= 0; i--)35                     {36                         long localWrittenBytes = socket.Send(bufferList, SocketFlags.None, out SocketError errorCode);37                         if (errorCode != SocketError.Success && errorCode != SocketError.WouldBlock)38                         {39                             throw new SocketException((int)errorCode);40                         }

DotNetty TcpSocketChannel類的DoWrite函數(shù), 19行獲取當(dāng)前ChannelOutboundBuffer的Segment<byte>數(shù)組, 然后在36行調(diào)用Socket.Send一次性發(fā)出去, 這個(gè)是Gathering Write的關(guān)鍵. 有了這個(gè), 就可以不在業(yè)務(wù)層用CompositeByteBuffer.

DotNetty Libuv Transport的實(shí)現(xiàn)可以看6, 思想是類似的.

 

實(shí)際上Orleans 3.x做的網(wǎng)絡(luò)優(yōu)化, 也有類似的思想:

 1 private async Task ProcessOutgoing() 2 { 3     await Task.Yield(); 4  5     Exception error = default;    6     PipeWriter output = default; 7     var serializer = this.serviceProvider.GetRequiredService<IMessageSerializer>(); 8     try 9     {10         output = this.Context.Transport.Output;11         var reader = this.outgoingMessages.Reader;12         if (this.Log.IsEnabled(LogLevel.Information))13         {14             this.Log.LogInformation(15                 "Starting to process messages from local endpoint {Local} to remote endpoint {Remote}",16                 this.LocalEndPoint,17                 this.RemoteEndPoint);18         }19 20         while (true)21         {22             var more = await reader.WaitToReadAsync();23             if (!more)24             {25                 break;26             }27 28             Message message = default;29             try30             {31                 while (inflight.Count < inflight.Capacity && reader.TryRead(out message) && this.PrepareMessageForSend(message))32                 {33                     inflight.Add(message);34                     var (headerLength, bodyLength) = serializer.Write(ref output, message);35                     MessagingStatisticsGroup.OnMessageSend(this.MessageSentCounter, message, headerLength + bodyLength, headerLength, this.ConnectionDirection);36                 }37             }38             catch (Exception exception) when (message != default)39             {40                 this.OnMessageSerializationFailure(message, exception);41             }42 43             var flushResult = await output.FlushAsync();44             if (flushResult.IsCompleted || flushResult.IsCanceled)45             {46                 break;47             }48 49             inflight.Clear();50         }

核心在31行, 開始寫, 43行開始flush, 只不過Orleans用的pipelines io, DotNetty是傳統(tǒng)模型.

 

這樣做, 可以在有限的pps下, 支撐更高的吞吐量.

 

個(gè)人感覺DotNetty更好用一些.

 

參考:

1. https://github.com/Azure/DotNetty/blob/dev/src/DotNetty.Transport/Channels/Sockets/TcpSocketChannel.cs#L271-L288

2. https://github.com/dotnet/orleans/blob/master/src/Orleans.Core/Networking/Connection.cs#L282-L294

3. https://docs.microsoft.com/zh-cn/windows/win32/winsock/scatter-gather-i-o-2

4. https://linux.die.net/man/2/writev

5. https://github.com/chenshuo/muduo/blob/d980315dc054b122612f423ee2e1316cb14bd3b5/muduo/net/Buffer.cc#L28-L38

6. https://github.com/Azure/DotNetty/blob/dev/src/DotNetty.Transport.Libuv/Native/WriteRequest.cs#L106-L128

 

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊舉報(bào)
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
推薦5個(gè)漲知識工具,開啟很新的閱讀方式
RESTful API 設(shè)計(jì)最佳實(shí)踐
使用HTML5來實(shí)現(xiàn)本地文件讀取和寫入
把代碼貼進(jìn)去自動(dòng)找bug,這個(gè)debug神器自動(dòng)修復(fù)僅需幾秒,還有GPT-3在線解惑
認(rèn)識HTML5的WebSocket
Message Port: Communicating with Other Applications
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服