因為 clients 超過 1000 個,所以使用 event 方式而不產生 threads, 每個 client 都將自己
想要看的股票代碼傳進來 , 此時使用 lockfreequeue 把 client 欲刪除/增加 的股票代碼 傳進去每個
股票的 map , map 維護該股票需要傳送行情的 client file descriptor , 每支股票都有自己的 lockfreequeue,
所以當要傳送行情前 , 先將 lockfreequeue 裡面 要新增/刪除 的 fd 先維護 map , 之後開始傳送 ,
因為中間過程 都是 lock free , 速度上當然快, 而 lockfreequeue 使用的是 unbounded buffer ,
所以 不像 bounded buffer 方式 implement 那麼需要浪費記憶體 !!!!!!
所以 , 你如需要一萬個clients收行情 , 把行情放在 shared memory , 跑十支這樣的程式 , 應該可行 !!
目前用 20 檔股票代碼寫簡單的運作,真實環境可以到 10000 檔股票代碼 , 10000 檔 股票 不可能用
loop 去找該股票的 array index , 目前 prefer 使用 skip list , thread pool 方面 , 假設有
10 個 thread 服務 10000 股股票 , 也就是每個 thread 要傳送 1000 檔股票的行情 , 最簡單方式就是 polling,
有新行情就傳送到 map 裡面的 fd ,沒有就跳過 , 另外可以用一個結構更新每檔股票的傳送序號,有新序號才送 ,
此時不宜使用 queue , 因為 假設 queue 有以下順序 : 2330->2301->2330->1101->2330->1102->2330
你使用 queue , 2330 要控制只傳一次要花點時間設計,就算使用 priority queue 也需要避免 2330 被不同 thread傳送多次,
polling 以及結構更新序號讓程式好維護很多 !!!!!
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/event-config.h>
#include <stdarg.h>
#include <event2/util.h>
#include <event2/buffer.h>
#include "../DoubleWordCas/lockfreequeue.hpp"
using namespace std ;
#define LISTEN_PORT 4444
#define LISTEN_BACKLOG 32
void do_accept(evutil_socket_t listener, short event, void *arg);
void read_cb(struct bufferevent *bev, void *arg);
void error_cb(struct bufferevent *bev, short event, void *arg);
void write_cb(struct bufferevent *bev, void *arg);
void DoStkidVec(int,char *) ;
#define MAXCLIENT 1024
struct stkidvec_
{
vector<string> v_old ;
vector<string> v_new ;
} ;
typedef struct stkidvec_ stkidvec ;
stkidvec clientarray[MAXCLIENT] ;
class queuedata
{
public:
queuedata()
{
strcpy(qdata,"") ;
}
queuedata(char* s)
{
strcpy(qdata,s) ;
}
const char* getdata() const { return qdata ; }
queuedata& operator=(const queuedata& q) //copy assignment
{
strcpy(qdata,q.getdata()) ;
return *this ;
}
private :
char qdata[128] ;
} ;
#define MAXSTKID 20
LockFreeQueue<queuedata> lfqueue[MAXSTKID] ;
map<int,int> StkidMap[MAXSTKID] ;
struct datafeed_
{
char stkid[11] ;
double bestbuyprc1 ;
double bestsellprc1 ;
int volume ;
} ;
typedef struct datafeed_ datafeed ;
datafeed DataFeed[MAXSTKID] ;
void InitDataFeed(void)
{
int idx,idy,idz ;
char strtmp[128] ;
for(idx=0;idx<MAXSTKID;idx++)
{
sprintf(strtmp,"00%02d",idx) ;
strtmp[4]=0x00 ;
strcpy(DataFeed[idx].stkid,strtmp) ;
DataFeed[idx].bestbuyprc1 = (idx + 1) * 0.1 ;
DataFeed[idx].bestsellprc1 = (idx + 1) * 0.15 ;
DataFeed[idx].volume = (idx + 1) * 1000 ;
} //for
}
int LocateLockFreeQueue(const string& stkid)
{
int istkid ;
istkid = atoi(stkid.c_str()) ;
if( (istkid >= MAXSTKID) || (istkid < 0) )
return -1 ;
return istkid ;
}
void *funcqueue(void *arg)
{
int threadid = (int)(long) arg;
int idx,idy,idz ;
pthread_detach(pthread_self());
while(1)
{
for(idx=0;idx<MAXSTKID;idx++)
{
queuedata q ;
char action[8],stkid[18] ;
int fd ;
while(lfqueue[idx].queue_pop(q) > 0)
{
const char *ptr = q.getdata() ;
sscanf(ptr,"%s %d %s",action,&fd,stkid) ;
//printf("(%s)(%d)(%s) poped in (%d)\n",action,fd,stkid,idx ) ;
if(strncmp(action,"A",1)==0)
{
StkidMap[idx].insert(pair<int,int>(fd,fd) ) ;
}else if(strncmp(action,"D",1)==0){
StkidMap[idx].erase(fd) ;
}
} //while
std::map<int,int>::iterator it ;
for (it=StkidMap[idx].begin(); it!=StkidMap[idx].end(); ++it)
{
//printf("To (%d) send out idx=(%d)\n",it->second,idx);
char data[128] ;
sprintf(data,"%10s|%05.2f|%05.2f|%07d|",DataFeed[idx].stkid,
DataFeed[idx].bestbuyprc1,DataFeed[idx].bestsellprc1,
DataFeed[idx].volume ) ;
//printf("To (%d) send out(%s)\n",it->second,data) ;
send(it->second,data,strlen(data),MSG_NOSIGNAL) ;
}
//printf("====================================== \n") ;
} // for
usleep(1000) ;
} //while
} //funcqueue
int main(int argc, char *argv[])
{
int ret;
evutil_socket_t listener;
listener = socket(AF_INET, SOCK_STREAM, 0);
assert(listener > 0);
evutil_make_listen_socket_reuseable(listener);
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(LISTEN_PORT);
int idx,idy,idz ;
InitDataFeed() ;
pthread_t tqueue ;
int iCPU = 0 ;
pthread_create(&tqueue,NULL,funcqueue,(void *)(long)iCPU );
if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
perror("bind");
return 1;
}
if (listen(listener, LISTEN_BACKLOG) < 0) {
perror("listen");
return 1;
}
printf ("Listening...\n");
evutil_make_socket_nonblocking(listener);
struct event_base *base = event_base_new();
assert(base != NULL);
struct event *listen_event;
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
event_add(listen_event, NULL);
event_base_dispatch(base);
printf("The End.");
return 0;
}
void do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = (struct event_base *)arg;
evutil_socket_t fd;
struct sockaddr_in sin;
socklen_t slen;
fd = accept(listener, (struct sockaddr *)&sin, &slen);
if (fd < 0) {
perror("accept");
return;
}
if (fd > FD_SETSIZE) {
perror("fd > FD_SETSIZE\n");
return;
}
printf("ACCEPT: fd = %u\n", fd);
int idx,idy,idz ;
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
bufferevent_enable(bev, EV_READ|EV_PERSIST);
}
void read_cb(struct bufferevent *bev, void *arg)
{
char buflen[5],record[256] ;
int idy,n;
evutil_socket_t fd = bufferevent_getfd(bev);
struct evbuffer *input = bufferevent_get_input(bev);
int datalen = 0 ;
size_t len1 = evbuffer_get_length(input);
if(len1 < 4)
return ;
evbuffer_copyout(input, buflen, 4) ;
buflen[4]=0x00 ;
if( (buflen[0] < '0') || (buflen[0] > '9') ||
(buflen[1] < '0') || (buflen[1] > '9') ||
(buflen[2] < '0') || (buflen[2] > '9') ||
(buflen[3] < '0') || (buflen[3] > '9') )
{
size_t lentmp = evbuffer_get_length(input);
evbuffer_drain(input,lentmp) ;
return ;
}
datalen = atoi(buflen) ;
size_t len2 = evbuffer_get_length(input);
if(len2 < (4+datalen) )
return ;
evbuffer_drain(input, 4);
evbuffer_remove(input,record,datalen) ;
record[datalen] = 0x00 ;
//printf("(%d)(%s)\n",fd,record) ;
DoStkidVec(fd,record) ;
}
void write_cb(struct bufferevent *bev, void *arg) {}
void error_cb(struct bufferevent *bev, short event, void *arg)
{
evutil_socket_t fd = bufferevent_getfd(bev);
printf("fd = %u, ", fd);
if (event & BEV_EVENT_TIMEOUT) {
printf("Timed out\n"); //if bufferevent_set_timeouts() called
}
else if (event & BEV_EVENT_EOF) {
for(int i=0;i<clientarray[fd].v_old.size();i++)
{
int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
if(iarr >= 0)
{
char str[128] ;
sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
//cout <<"iarr=(" << iarr << ")(" << str << ")" << endl ;
queuedata q(str) ;
lfqueue[iarr].queue_push(q) ;
}
//cout << clientarray[fd].v_old[i] << "removed" << endl ;
} //for
clientarray[fd].v_old.clear() ;
clientarray[fd].v_new.clear() ;
printf("connection closed\n");
}
else if (event & BEV_EVENT_ERROR) {
printf("some other error\n");
}
bufferevent_free(bev);
}
void DoStkidVec(int fd,char *record)
{
clientarray[fd].v_new.clear() ;
char *pch ;
pch = strtok(record,"@") ;
while(pch != NULL)
{
//printf("(%s)\n",pch) ;
string s(pch) ;
clientarray[fd].v_new.push_back(s) ;
pch = strtok(NULL,"@") ;
} //while
for(int i=0;i<clientarray[fd].v_new.size();i++)
{
vector<string>::iterator it;
it = find(clientarray[fd].v_old.begin(),clientarray[fd].v_old.end(),clientarray[fd].v_new[i] ) ;
if( it == clientarray[fd].v_old.end() )
{
int iarr = LocateLockFreeQueue(clientarray[fd].v_new[i]) ;
if(iarr >= 0)
{
char str[128] ;
sprintf(str,"%s %d %s","A",fd,clientarray[fd].v_new[i].c_str());
//cout <<"iarr=(" << iarr << ")(" << str << ")" << endl ;
queuedata q(str) ;
lfqueue[iarr].queue_push(q) ;
}
//cout << clientarray[fd].v_new[i] << "added" << endl ;
}
} //for
//cout << endl ;
for(int i=0;i<clientarray[fd].v_old.size();i++)
{
vector<string>::iterator it;
it = find(clientarray[fd].v_new.begin(),clientarray[fd].v_new.end(),clientarray[fd].v_old[i] ) ;
if( it == clientarray[fd].v_new.end() )
{
int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
if(iarr >= 0)
{
char str[128] ;
sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
//cout <<"iarr=(" << iarr << ")(" << str << ")" << endl ;
queuedata q(str) ;
lfqueue[iarr].queue_push(q) ;
}
//cout << clientarray[fd].v_old[i] << "removed" << endl ;
}
} //for
//cout << "=================================" << endl ;
clientarray[fd].v_old = clientarray[fd].v_new ;
}
=================================================
Client :
void *thread2(void *param)
{
char strdata[81] ;
int icnt = 0,nread=0 ;
char stkid[11] ;
int volume ;
double bestbuyprc1,bestsellprc1 ;
char strarr[4][20] ;
for ( ; ; )
{
memset(strdata,0x00,sizeof(strdata) ) ;
if ( (nread = recv(sd,strdata,31,MSG_NOSIGNAL)) <= 0)
{
break;
}
strdata[31]=0x00 ;
//printf("recieve(%s)\n",strdata) ;
memset(stkid,0x00,sizeof(stkid)) ;
sscanf(strdata, "%[^'|']|%[^'|']|%[^'|']|%[^'|']|",
&strarr[0],&strarr[1],&strarr[2],&strarr[3] ) ;
//printf("(%s)(%s)(%s)(%s)\n",strarr[0],strarr[1],strarr[2],strarr[3]);
}
}
void *thread1(void *param)
{
char strx[128] ;
int idx,inum = 0 ;
while(1)
{
inum++ ;
if( (inum%3) == 0){
strcpy(strx,"0030") ;
strcat(strx,"@0001@0002@0003") ;
strcat(strx,"@0004@0005@0006") ;
}else if( (inum%3) == 1){
strcpy(strx,"0040") ;
strcat(strx,"@0007@0002@0003") ;
strcat(strx,"@0004@0005@0008") ;
strcat(strx,"@0014@0015") ;
}else{
strcpy(strx,"0045") ;
strcat(strx,"@0001@0002@0003") ;
strcat(strx,"@0004@0005@0016") ;
strcat(strx,"@0019@0014@0017") ;
}
printf("(%d)(%s)\n",strlen(strx),strx) ;
if(send(sd,strx,strlen(strx),MSG_NOSIGNAL) <=0)
exit(0) ;
sleep(10) ;
}
}
===============================================
compile :
g++ --std=c++0x datafeedserver.cpp -levent -lpthread -o datafeedserver.exe
g++ --std=c++0x datafeedclient.cpp -levent -o datafeedclient.exe
留言列表