上篇的例子 , 如果使用 thread pool 而不是 每個 client 一個 threads ?!
這樣做的話 , 每個 client 只要一個結構去管理 , thread pool 固定去看 shared memory 以及
結構中的 counter 去看有無更新行情要傳送 client !!!!
這個方法讓一支程式可以處理大量 clients , 同樣不用任何 lock !!!!!
程式稍複雜些 :
void * doit(void *arg)
{
pthread_detach(pthread_self());
int fd = 11 ;
while(1)
{
usleep(1000) ;
for( fd=0;fd<THREADNUM;fd++ ){
if(ThreadArray[fd].getWantFlag() != 1){
if( ThreadArray[fd].getStkidCnt() > 0 )
ThreadArray[fd].eraseall() ;
ThreadArray[fd].setRealFlag(0) ;
continue ;
}
string s ;
for(int idx=0;idx<ThreadArray[fd].getStkidCnt();idx++){
try{
s = ThreadArray[fd][idx] ;
}catch(...){}
uint sharedMemIdx ;
if( hashtable.findkey( s.c_str(),sharedMemIdx ) ){
;
}else
continue ;
datafeed localdata ;
uint MapIdx ;
if( ThreadArray[fd].getMap().findkey( s.c_str(),MapIdx ) ){
int iret = -1 ;
iret = getFeedData( sharedMemIdx,MapIdx,localdata ) ;
if( iret >= 0 ){
ThreadArray[fd].getMap()[s.c_str()] = iret ;
processFeed( fd,localdata ) ;
}
//cout << iret << "***" << MapIdx << endl ;
}else{
int iret = -1 ,iSeqno = -1 ;
iret = getFeedData( sharedMemIdx,iSeqno,localdata ) ;
if( iret >= 0 ){
ThreadArray[fd].getMap()[s.c_str()] = iret ;
processFeed( fd,localdata ) ;
}
//cout << iret << "!!!" << endl ;
}
} //for
while(1){
string str ;
if(mqcounter[fd]==0)
break ;
cout << "mqcounter[fd]=" << mqcounter[fd] << endl ;
if( !mqptr[fd]->dequeue(str) )
break ;
cout << "(" << str << ") dequeue" << endl ;
if( str == "" )
break ;
string s(str,1,1024) ;
if(str[0]=='N'){
cout << s << " inserted " << endl ;
ThreadArray[fd].insertStkid(s) ;
}
if(str[0]=='D'){
cout << s << " deleted " << endl ;
ThreadArray[fd].deleteStkid(s) ;
}
__sync_sub_and_fetch(&mqcounter[fd],1) ;
} //while
} //for ( fd=0;fd<THREADNUM;fd++ )
} //endless while
printf("Thread...(%d) thread closed \n",fd) ;
return(NULL);
} //doit
留言列表