1.  像資料庫一樣,使用一個 port listen , 當接收到  連線時 , 立刻使用 pass fd through unix domain

將 fd 轉到其他 process  ,  例如 50支程式 , 每支 500 threads , 可以服務兩萬五千個 clients ,

平均分給  50 支程式服務 , resource 越多 , 可以執行程式越多,服務越多 , 行情程式一份 ,所以程式全都

去 shared memory 讀取....!!

 

2. 真正服務的 socket server 透過 Unix domain 收到 fd , 可以使用 libevent 將 server 設計成

non-blocking server ,  這樣 client 要傳任何值互動變得容易 , 最重要的是 , libevent 幫你管理 socket buffer ,

你的每個封包 , 前面四碼是表示封包長度的這種應用 , non-blocking 需要花結構管理且易出錯 , 然而 libevent

讓這個工作變得相當容易 .

 

3. 使用 StringHashTable 來存取 shared memory location !!! 定位每秒上千萬次 , 使用 seqlock

可從 shared memory copy 至 local , 每秒 50 萬次 !!!

 

4.  對每個 fd create thread  , 這讓程式很好管理 , 程式容易維護且不影響速度 .

5.  process/process , thread/thread 完全不用 lock ,所以就算 socket

程式執行一百支,也不影響 行情更新performance  ,就算 read/write lock

也是使用到 lock ,seqlock 是使用 lock-free 的方法 , 它被應用在 linux kernel ,

只能使用在 Intel , AMD 這種 strong memory model 的 cpu.

 

以上步驟可建立 datafeed socket server !!!....且服務大量的 clients , 而且  clients 可任意 增減 股票代碼

無須重新連線  !!!!!

 

程式碼片段 :

void * doit(void *arg)
{
    pthread_detach(pthread_self());
    int fd = (int)(arg) ;
    ThreadArray[fd].setWantFlag(1) ;
    ThreadArray[fd].setRealFlag(1) ;

    StringHashTable map(512) ;

    while(1)
    {
        if(ThreadArray[fd].getWantFlag() != 1)
            break ;
        usleep(1000) ;
        string s ;
        for(int idx=0;idx<ThreadArray[fd].getStkidCnt();idx++){
            try{
                s = ThreadArray[fd][idx] ;
            }catch(...){}

            uint sharedMemIdx   ;
            if( hashtable.findkey( s.c_str(),sharedMemIdx ) ){
                ;
            }else
                continue ;

            datafeed localdata ;
            uint MapIdx ;
            if( map.findkey( s.c_str(),MapIdx ) ){
                int iret = -1  ;
                iret = getFeedData( sharedMemIdx,MapIdx,localdata ) ;
                if( iret >= 0 ){
                    map[s.c_str()] = iret ;
                    processFeed( fd,localdata ) ;
                }
                //cout << iret << "***" << MapIdx << endl ;
            }else{
                int iret = -1 ,iSeqno = -1 ;
                iret = getFeedData( sharedMemIdx,iSeqno,localdata ) ;
                if( iret >= 0 ){
                    map[s.c_str()] = iret ;
                    processFeed( fd,localdata ) ;
                }
            }
        } //for

        while(1){
            string str ;
            if(mqcounter[fd]==0)
                break ;
            cout << "mqcounter[fd]=" << mqcounter[fd] << endl ;
            if( !mqptr[fd]->dequeue(str) )
                break ;
            cout << "(" << str << ") dequeue" << endl ;
            if( str == "" )
                break ;

            string s(str,1,1024) ;
            if(str[0]=='N'){
                cout << s << " inserted " << endl ;
                ThreadArray[fd].insertStkid(s) ;
            }
            if(str[0]=='D'){
                cout << s << " deleted " << endl ;
                ThreadArray[fd].deleteStkid(s) ;
            }
            __sync_sub_and_fetch(&mqcounter[fd],1) ;
        } //while

    } //endless  while
    printf("Thread...(%d) thread closed \n",fd) ;
    ThreadArray[fd].eraseall() ;
    ThreadArray[fd].setRealFlag(0) ;
    return(NULL);
} //doit               

arrow
arrow
    全站熱搜

    hedgezzz 發表在 痞客邦 留言(0) 人氣()