要做分析網(wǎng)絡(luò)數(shù)據(jù)包的項(xiàng)目,我寫(xiě)好的個(gè)多線程的環(huán)形緩沖,自己試了半天,好像沒(méi)錯(cuò)誤,大家?guī)兔聪?。我沒(méi)有用鎖,鎖對(duì)性能影響好像很大。但我覺(jué)得還有可以優(yōu)化的地方。
背景要求:將捕獲到的包存入環(huán)形緩沖的一個(gè)單元,如果緩沖緩沖滿了(寫(xiě)線程快讀線程一圈),就開(kāi)一個(gè)是原來(lái)兩倍大小新環(huán)形緩沖。讀線程在處理完舊緩沖緩沖后,釋放舊緩沖空間。
兩個(gè)線程,生產(chǎn)者和消費(fèi)者問(wèn)題。
先說(shuō)數(shù)據(jù)結(jié)構(gòu):
環(huán)形緩沖中有很多單元,每一個(gè)單元用于儲(chǔ)存一個(gè)包的數(shù)據(jù)。線程可以同時(shí)訪問(wèn)環(huán)形緩沖這個(gè)大數(shù)據(jù)結(jié)構(gòu),但不能同時(shí)訪問(wèn)某一個(gè)緩沖單元。所以我在每個(gè)單元里加了個(gè)標(biāo)志,0表明可寫(xiě),1表明可讀。初始為0.這樣就不需要用到鎖或信號(hào)量。 當(dāng)標(biāo)志為0,寫(xiě)線程可以訪問(wèn),寫(xiě)完數(shù)據(jù)后,將標(biāo)志改為1(都沒(méi)使用原子操作),并跳入下一個(gè)單元。讀線程在標(biāo)志為1的時(shí)候,讀取其中的數(shù)據(jù),讀完后將標(biāo)志改為0.
/*Cirbuf chain 寫(xiě)線程可能很快 以至于不只有1個(gè)或2個(gè)環(huán)形緩沖,可能有多個(gè)*/ struct Cirbuf* packetCirbuf;
/*環(huán)形緩沖的一個(gè)單元*/ struct CirUnit { int flag; /*0 表明可寫(xiě),1表明可讀*/ char unit[UNITSIZE]; };
/*環(huán)形緩沖*/ struct Cirbuf { struct Cirbuf * newBuf; /*point to new Cirbuf*/ int readPoint; /*for reading thread*/ int writePoint; /*for writing thread*/ int bufSize; /*buf char array's size*/ struct CirUnit *buf; /*point to unit array*/ }; |
讀寫(xiě)線程
對(duì)于寫(xiě)線程:
如果寫(xiě)線程發(fā)現(xiàn)標(biāo)志為0,這此緩沖單元可寫(xiě),將數(shù)據(jù)寫(xiě)入。
如果寫(xiě)線程發(fā)現(xiàn)當(dāng)前標(biāo)志為1,就表明已經(jīng)追上讀線程,于是要新開(kāi)緩沖空間。并把新空間加入到環(huán)形緩沖區(qū)鏈表中。
對(duì)于讀線程:
如果發(fā)現(xiàn)標(biāo)志位1,則單元可讀,讀出數(shù)據(jù),并把標(biāo)志改為0.
如果讀線程發(fā)現(xiàn)當(dāng)前標(biāo)志為0,就表明 (1).追上了寫(xiě)線程 (2)或舊緩沖區(qū)所有單元已經(jīng)被讀線程處理完,而此時(shí)寫(xiě)線程已經(jīng)在處理新緩沖區(qū)。 對(duì)于這兩種情況的分辨,只要看下面Cirbuf結(jié)構(gòu)的Cirbuf * newBuf 指針,如果指針為空,表明為第一種情況,否則為第二種。對(duì)于第二種情況,需要釋放舊的環(huán)形緩沖區(qū)空間,并將指針指向新的環(huán)形緩沖區(qū)。
讀線程在數(shù)據(jù)單元不可用時(shí)(讀線程追上寫(xiě)線程),我最先寫(xiě)代碼時(shí)是讓他continue while的循環(huán),后來(lái)改成了讓他delay 50ms,這里不知道怎么處理好。
extern "C" void* ReadThread(void *arg) { while(1) { if(packetCirbuf->buf[packetCirbuf->readPoint].flag == 0) { if(packetCirbuf->newBuf != NULL) { /*表明readthread已經(jīng)處理完舊的緩沖區(qū)并且已經(jīng)有新的緩沖區(qū),這時(shí)應(yīng)該釋放舊緩沖*/ struct Cirbuf *temp = packetCirbuf; packetCirbuf = packetCirbuf->newBuf; FreeCirbuf(temp); continue; } /* delay*/ pthread_cond_t mycond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER; struct timespec ts; int rv; ts.tv_sec = 0; ts.tv_nsec = 50000; /* 500,00 nanoseconds = 50 ms */ pthread_mutex_lock(&mymutex); rv = pthread_cond_timedwait(&mycond, &mymutex, &ts); pthread_mutex_unlock(&mymutex);
continue; } DoRead(); packetCirbuf->buf[packetCirbuf->readPoint].flag = 0; if(++packetCirbuf->readPoint == packetCirbuf->bufSize) packetCirbuf->readPoint = 0; } return NULL; }
extern "C" void* WriteThread(void *arg) { struct Cirbuf* latestBuf; /*進(jìn)寫(xiě)線程后,初始化latestBuf指針 使其指向packetCirbuf,這時(shí)的packetCirbuf是由主線程函數(shù)創(chuàng)建的初始buf*/ latestBuf = packetCirbuf; if(latestBuf == NULL) { cout << "packet capture buffer NULL error" << endl; exit(0); } /*進(jìn)入循環(huán)*/ char test=0; while(1) { if (latestBuf->buf[latestBuf->writePoint].flag == 1) { /*we need a larger buf*/ latestBuf = GetNewCirbuf(latestBuf->bufSize * 2,latestBuf); } else { DoWrite(latestBuf,test++); latestBuf->buf[latestBuf->writePoint].flag = 1; if(++latestBuf->writePoint == latestBuf->bufSize) latestBuf->writePoint = 0; } } return NULL; } |
不需要用鎖的原因是 數(shù)據(jù)競(jìng)爭(zhēng)發(fā)生在flag內(nèi)存單元,
讀線程如果取得flag為 1,但線程調(diào)度或另一個(gè)CPU核上的線程將flag改為0,就會(huì)發(fā)生錯(cuò)誤。但只有讀線程自己會(huì)把flag從1變?yōu)?,所以不會(huì)出錯(cuò) 同理寫(xiě)進(jìn)程也是。
程序應(yīng)該有優(yōu)化的地方 比如讀進(jìn)程追上寫(xiě)進(jìn)程的時(shí)候,是阻塞50ms還是直接continue還是有其他方法之類。
我測(cè)試是寫(xiě)進(jìn)程往緩沖寫(xiě)數(shù)字1 2 3 讀進(jìn)程打印出來(lái),打印的結(jié)果沒(méi)有錯(cuò)序,也沒(méi)有缺數(shù)發(fā)生。
整個(gè)程序代碼:
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <iostream> #include <thread.h> #include <time.h>
#define INITBUFSIZE 10 /*環(huán)型緩沖區(qū)初始單元個(gè)數(shù)*/ #define UNITSIZE 200/*每單元大小*/
using namespace std; /*global data*/ /*Cirbuf chain 寫(xiě)進(jìn)程可能很快 以至于不只有1個(gè)或2個(gè)環(huán)形緩沖,可能有多個(gè)*/ struct Cirbuf* packetCirbuf;
/*環(huán)形緩沖的一個(gè)單元*/ struct CirUnit { int flag; /*0 表明可寫(xiě),1表明可讀*/ char unit[UNITSIZE]; };
/*環(huán)形緩沖*/ struct Cirbuf { struct Cirbuf * newBuf; /*point to new Cirbuf*/ int readPoint; /*for reading thread*/ int writePoint; /*for writing thread*/ int bufSize; /*buf char array's size*/ struct CirUnit *buf; /*point to unit array*/ };
struct Cirbuf* GetNewCirbuf(int bufSize,struct Cirbuf* oldBuf) { if(bufSize>30000) { printf("oh my god,the bufSize is out of my league,I cannot handle it,exit"); exit(1); } struct Cirbuf* newBuf = new Cirbuf(); if(oldBuf != NULL) oldBuf->newBuf = newBuf; newBuf->newBuf = NULL; newBuf->readPoint = newBuf->writePoint = 0; newBuf->bufSize = bufSize; newBuf->buf = new CirUnit[bufSize]; memset(newBuf->buf,0,sizeof(CirUnit)*bufSize); /*初始化單元為0*/ return newBuf; }
int FreeCirbuf(struct Cirbuf* bufPoint) { delete bufPoint->buf; delete bufPoint; return 1; }
void DoWrite(struct Cirbuf* latestBuf,char flag) { latestBuf->buf[latestBuf->writePoint].unit[0] = flag; //printf("%d ",flag); }
void DoRead() { //cout<< packetCirbuf->buf[packetCirbuf->readPoint].unit[0] << endl; //printf("%d ",packetCirbuf->buf[packetCirbuf->readPoint].unit[0]); //printf("."); }
extern "C" void* ReadThread(void *arg) { while(1) { if(packetCirbuf->buf[packetCirbuf->readPoint].flag == 0) { if(packetCirbuf->newBuf != NULL) { /*表明readthread已經(jīng)處理完舊的緩沖區(qū)并且已經(jīng)有新的緩沖區(qū),這時(shí)應(yīng)該釋放舊緩沖*/ struct Cirbuf *temp = packetCirbuf; packetCirbuf = packetCirbuf->newBuf; FreeCirbuf(temp); continue; } /* delay*/ pthread_cond_t mycond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER; struct timespec ts; int rv; ts.tv_sec = 0; ts.tv_nsec = 50000; /* 500,00 nanoseconds = 50 ms */ pthread_mutex_lock(&mymutex); rv = pthread_cond_timedwait(&mycond, &mymutex, &ts); pthread_mutex_unlock(&mymutex);
continue; } DoRead(); packetCirbuf->buf[packetCirbuf->readPoint].flag = 0; if(++packetCirbuf->readPoint == packetCirbuf->bufSize) packetCirbuf->readPoint = 0; } return NULL; }
extern "C" void* WriteThread(void *arg) { struct Cirbuf* latestBuf; /*進(jìn)寫(xiě)進(jìn)程后,初始化latestBuf指針 使其指向packetCirbuf,這時(shí)的packetCirbuf是由主線程函數(shù)創(chuàng)建的初始buf*/ latestBuf = packetCirbuf; if(latestBuf == NULL) { cout << "packet capture buffer NULL error" << endl; exit(0); } /*進(jìn)入循環(huán)*/ char test=0; while(1) { if (latestBuf->buf[latestBuf->writePoint].flag == 1) { /*we need a larger buf*/ latestBuf = GetNewCirbuf(latestBuf->bufSize * 2,latestBuf); } else { DoWrite(latestBuf,test++); latestBuf->buf[latestBuf->writePoint].flag = 1; if(++latestBuf->writePoint == latestBuf->bufSize) latestBuf->writePoint = 0; } } return NULL; }
int main(int argc, char *argv[]) { pthread_t threadNum[2]; packetCirbuf = GetNewCirbuf(INITBUFSIZE,NULL); thr_create(0,0,WriteThread,0,0,&threadNum[0]); thr_create(0,0,ReadThread,0,0,&threadNum[1]); thr_join(threadNum[0],NULL,NULL); thr_join(threadNum[1],NULL,NULL); FreeCirbuf(packetCirbuf); }
|