上篇的例子 ,  如果使用 thread pool 而不是 每個 client 一個 threads ?!

這樣做的話 , 每個 client 只要一個結構去管理 ,  thread pool 固定去看 shared memory 以及

結構中的 counter 去看有無更新行情要傳送 client !!!!

 

這個方法讓一支程式可以處理大量 clients , 同樣不用任何  lock !!!!!

 

程式稍複雜些  : 

 

void * doit(void *arg)
{
    pthread_detach(pthread_self());
    int fd = 11 ;

    while(1)
    {
        usleep(1000) ;
        for( fd=0;fd<THREADNUM;fd++ ){
            if(ThreadArray[fd].getWantFlag() != 1){
              
                if( ThreadArray[fd].getStkidCnt() > 0 )
                    ThreadArray[fd].eraseall() ;
                ThreadArray[fd].setRealFlag(0) ;
                continue  ;
            }
            string s ;
            for(int idx=0;idx<ThreadArray[fd].getStkidCnt();idx++){
                try{
                    s = ThreadArray[fd][idx] ;
                }catch(...){}

                uint sharedMemIdx   ;
                if( hashtable.findkey( s.c_str(),sharedMemIdx ) ){
                    ;
                }else
                    continue ;

                datafeed localdata ;
                uint MapIdx ;
                if( ThreadArray[fd].getMap().findkey( s.c_str(),MapIdx ) ){
                    int iret = -1  ;
                    iret = getFeedData( sharedMemIdx,MapIdx,localdata ) ;
                    if( iret >= 0 ){
                        ThreadArray[fd].getMap()[s.c_str()] = iret ;
                        processFeed( fd,localdata ) ;
                    }
                    //cout << iret << "***" << MapIdx << endl ;
                }else{
                    int iret = -1 ,iSeqno = -1 ;
                    iret = getFeedData( sharedMemIdx,iSeqno,localdata ) ;
                    if( iret >= 0 ){
                        ThreadArray[fd].getMap()[s.c_str()] = iret ;
                        processFeed( fd,localdata ) ;
                    }
                    //cout << iret << "!!!" << endl ;
                }
            } //for

            while(1){
                string str ;
                if(mqcounter[fd]==0)
                    break ;
                cout << "mqcounter[fd]=" << mqcounter[fd] << endl ;
                if( !mqptr[fd]->dequeue(str) )
                    break ;
                cout << "(" << str << ") dequeue" << endl ;
                if( str == "" )
                    break ;

                string s(str,1,1024) ;
                if(str[0]=='N'){
                    cout << s << " inserted " << endl ;
                    ThreadArray[fd].insertStkid(s) ;
                }
                if(str[0]=='D'){
                    cout << s << " deleted " << endl ;
                    ThreadArray[fd].deleteStkid(s) ;
                }
                __sync_sub_and_fetch(&mqcounter[fd],1) ;
            } //while
        } //for ( fd=0;fd<THREADNUM;fd++ )
    } //endless  while
    printf("Thread...(%d) thread closed \n",fd) ;
    return(NULL);
} //doit
               

 

 

arrow
arrow
    全站熱搜

    hedgezzz 發表在 痞客邦 留言(0) 人氣()