1. 像資料庫一樣,使用一個 port listen , 當接收到 連線時 , 立刻使用 pass fd through unix domain
將 fd 轉到其他 process , 例如 50支程式 , 每支 500 threads , 可以服務兩萬五千個 clients ,
平均分給 50 支程式服務 , resource 越多 , 可以執行程式越多,服務越多 , 行情程式一份 ,所以程式全都
去 shared memory 讀取....!!
2. 真正服務的 socket server 透過 Unix domain 收到 fd , 可以使用 libevent 將 server 設計成
non-blocking server , 這樣 client 要傳任何值互動變得容易 , 最重要的是 , libevent 幫你管理 socket buffer ,
你的每個封包 , 前面四碼是表示封包長度的這種應用 , non-blocking 需要花結構管理且易出錯 , 然而 libevent
讓這個工作變得相當容易 .
3. 使用 StringHashTable 來存取 shared memory location !!! 定位每秒上千萬次 , 使用 seqlock
可從 shared memory copy 至 local , 每秒 50 萬次 !!!
4. 對每個 fd create thread , 這讓程式很好管理 , 程式容易維護且不影響速度 .
5. process/process , thread/thread 完全不用 lock ,所以就算 socket
程式執行一百支,也不影響 行情更新performance ,就算 read/write lock
也是使用到 lock ,seqlock 是使用 lock-free 的方法 , 它被應用在 linux kernel ,
只能使用在 Intel , AMD 這種 strong memory model 的 cpu.
以上步驟可建立 datafeed socket server !!!....且服務大量的 clients , 而且 clients 可任意 增減 股票代碼
無須重新連線 !!!!!
程式碼片段 :
void * doit(void *arg)
{
pthread_detach(pthread_self());
int fd = (int)(arg) ;
ThreadArray[fd].setWantFlag(1) ;
ThreadArray[fd].setRealFlag(1) ;
StringHashTable map(512) ;
while(1)
{
if(ThreadArray[fd].getWantFlag() != 1)
break ;
usleep(1000) ;
string s ;
for(int idx=0;idx<ThreadArray[fd].getStkidCnt();idx++){
try{
s = ThreadArray[fd][idx] ;
}catch(...){}
uint sharedMemIdx ;
if( hashtable.findkey( s.c_str(),sharedMemIdx ) ){
;
}else
continue ;
datafeed localdata ;
uint MapIdx ;
if( map.findkey( s.c_str(),MapIdx ) ){
int iret = -1 ;
iret = getFeedData( sharedMemIdx,MapIdx,localdata ) ;
if( iret >= 0 ){
map[s.c_str()] = iret ;
processFeed( fd,localdata ) ;
}
//cout << iret << "***" << MapIdx << endl ;
}else{
int iret = -1 ,iSeqno = -1 ;
iret = getFeedData( sharedMemIdx,iSeqno,localdata ) ;
if( iret >= 0 ){
map[s.c_str()] = iret ;
processFeed( fd,localdata ) ;
}
}
} //for
while(1){
string str ;
if(mqcounter[fd]==0)
break ;
cout << "mqcounter[fd]=" << mqcounter[fd] << endl ;
if( !mqptr[fd]->dequeue(str) )
break ;
cout << "(" << str << ") dequeue" << endl ;
if( str == "" )
break ;
string s(str,1,1024) ;
if(str[0]=='N'){
cout << s << " inserted " << endl ;
ThreadArray[fd].insertStkid(s) ;
}
if(str[0]=='D'){
cout << s << " deleted " << endl ;
ThreadArray[fd].deleteStkid(s) ;
}
__sync_sub_and_fetch(&mqcounter[fd],1) ;
} //while
} //endless while
printf("Thread...(%d) thread closed \n",fd) ;
ThreadArray[fd].eraseall() ;
ThreadArray[fd].setRealFlag(0) ;
return(NULL);
} //doit
留言列表