因為 clients 超過 1000 個,所以使用 event 方式而不產生 threads, 每個 client 都將自己

想要看的股票代碼傳進來 , 此時使用 lockfreequeue 把 client 欲刪除/增加 的股票代碼 傳進去每個

股票的 map , map 維護該股票需要傳送行情的 client file descriptor ,  每支股票都有自己的 lockfreequeue,

所以當要傳送行情前 , 先將 lockfreequeue 裡面 要新增/刪除 的 fd 先維護 map , 之後開始傳送 ,

因為中間過程 都是 lock free , 速度上當然快,  而 lockfreequeue 使用的是 unbounded buffer ,

所以 不像 bounded buffer 方式 implement 那麼需要浪費記憶體 !!!!!!

 

所以 , 你如需要一萬個clients收行情 , 把行情放在 shared memory , 跑十支這樣的程式 , 應該可行 !!

目前用 20 檔股票代碼寫簡單的運作,真實環境可以到 10000 檔股票代碼 , 10000 檔 股票 不可能用

loop 去找該股票的 array index , 目前 prefer 使用  skip list , thread pool 方面 , 假設有

10 個 thread 服務 10000 股股票 ,  也就是每個 thread 要傳送 1000 檔股票的行情 , 最簡單方式就是 polling,

有新行情就傳送到 map 裡面的 fd ,沒有就跳過 ,  另外可以用一個結構更新每檔股票的傳送序號,有新序號才送 ,

此時不宜使用 queue , 因為 假設 queue 有以下順序 : 2330->2301->2330->1101->2330->1102->2330

你使用 queue , 2330 要控制只傳一次要花點時間設計,就算使用 priority queue 也需要避免 2330 被不同 thread傳送多次,

polling 以及結構更新序號讓程式好維護很多 !!!!!

 

#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/event-config.h>
#include <stdarg.h>
#include <event2/util.h>
#include <event2/buffer.h>
#include "../DoubleWordCas/lockfreequeue.hpp"

using namespace std ;

#define LISTEN_PORT 4444
#define LISTEN_BACKLOG 32

void do_accept(evutil_socket_t listener, short event, void *arg);
void read_cb(struct bufferevent *bev, void *arg);
void error_cb(struct bufferevent *bev, short event, void *arg);
void write_cb(struct bufferevent *bev, void *arg);
void DoStkidVec(int,char *) ;


#define MAXCLIENT 1024

struct stkidvec_
{
    vector<string> v_old ;
    vector<string> v_new ;
} ;
typedef struct stkidvec_ stkidvec ;
stkidvec clientarray[MAXCLIENT] ;

class queuedata
{
public:
    queuedata()
    {
        strcpy(qdata,"") ;
    }
    queuedata(char* s)
    {
        strcpy(qdata,s) ;
    }
    const char* getdata() const { return qdata ; }
    queuedata& operator=(const queuedata& q) //copy assignment
    {
        strcpy(qdata,q.getdata()) ;
        return *this ;
    }
private :
    char qdata[128] ;
} ;

 

#define MAXSTKID 20
LockFreeQueue<queuedata> lfqueue[MAXSTKID] ;
map<int,int> StkidMap[MAXSTKID] ;

struct datafeed_
{
    char stkid[11] ;
    double bestbuyprc1 ;
    double bestsellprc1 ;
    int  volume ;
} ;
typedef struct datafeed_ datafeed ;
datafeed DataFeed[MAXSTKID] ;

void InitDataFeed(void)
{
    int idx,idy,idz ;
    char strtmp[128] ;
    for(idx=0;idx<MAXSTKID;idx++)
    {
        sprintf(strtmp,"00%02d",idx) ;
        strtmp[4]=0x00 ;
        strcpy(DataFeed[idx].stkid,strtmp) ;
        DataFeed[idx].bestbuyprc1 = (idx + 1) * 0.1 ;
        DataFeed[idx].bestsellprc1 = (idx + 1) * 0.15 ;
        DataFeed[idx].volume = (idx + 1) * 1000 ;
    } //for
}

int LocateLockFreeQueue(const string& stkid)
{
    int istkid ;
    istkid = atoi(stkid.c_str()) ;
    if( (istkid >= MAXSTKID) || (istkid < 0) )
        return -1 ;
    return istkid ;
}

void *funcqueue(void *arg)
{
    int threadid = (int)(long) arg;
    int idx,idy,idz ;
    pthread_detach(pthread_self());
    while(1)
    {
        for(idx=0;idx<MAXSTKID;idx++)
        {
            queuedata q ;
            char action[8],stkid[18] ;
            int fd ;
            while(lfqueue[idx].queue_pop(q) > 0)
            {
                const char *ptr = q.getdata() ;
                sscanf(ptr,"%s %d %s",action,&fd,stkid) ;
                //printf("(%s)(%d)(%s) poped in (%d)\n",action,fd,stkid,idx ) ;
                if(strncmp(action,"A",1)==0)
                {
                    StkidMap[idx].insert(pair<int,int>(fd,fd) ) ;
                }else if(strncmp(action,"D",1)==0){
                    StkidMap[idx].erase(fd) ;
                }
            } //while
            std::map<int,int>::iterator it ;
            for (it=StkidMap[idx].begin(); it!=StkidMap[idx].end(); ++it)
            {
                //printf("To (%d) send out  idx=(%d)\n",it->second,idx);
                char data[128] ;
                sprintf(data,"%10s|%05.2f|%05.2f|%07d|",DataFeed[idx].stkid,
                DataFeed[idx].bestbuyprc1,DataFeed[idx].bestsellprc1,
                DataFeed[idx].volume ) ;
                //printf("To (%d) send out(%s)\n",it->second,data) ;
                send(it->second,data,strlen(data),MSG_NOSIGNAL) ;
            }
            //printf("====================================== \n") ;
        } // for
        usleep(1000) ;
    } //while
} //funcqueue

int main(int argc, char *argv[])
{
    int ret;
    evutil_socket_t listener;
    listener = socket(AF_INET, SOCK_STREAM, 0);
    assert(listener > 0);
    evutil_make_listen_socket_reuseable(listener);

    struct sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(LISTEN_PORT);

    int idx,idy,idz ;

    InitDataFeed() ;

    pthread_t tqueue ;
    int iCPU = 0 ;
    pthread_create(&tqueue,NULL,funcqueue,(void *)(long)iCPU );

    if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return 1;
    }

    if (listen(listener, LISTEN_BACKLOG) < 0) {
        perror("listen");
        return 1;
    }

    printf ("Listening...\n");

    evutil_make_socket_nonblocking(listener);

    struct event_base *base = event_base_new();
    assert(base != NULL);
    struct event *listen_event;
    listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    event_add(listen_event, NULL);
    event_base_dispatch(base);

    printf("The End.");
    return 0;
}

void do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = (struct event_base *)arg;
    evutil_socket_t fd;
    struct sockaddr_in sin;
    socklen_t slen;
    fd = accept(listener, (struct sockaddr *)&sin, &slen);
    if (fd < 0) {
        perror("accept");
        return;
    }
    if (fd > FD_SETSIZE) {
        perror("fd > FD_SETSIZE\n");
        return;
    }

    printf("ACCEPT: fd = %u\n", fd);

    int idx,idy,idz  ;

    struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
    bufferevent_enable(bev, EV_READ|EV_PERSIST);
}

void read_cb(struct bufferevent *bev, void *arg)
{
    char buflen[5],record[256] ;
    int idy,n;
    evutil_socket_t fd = bufferevent_getfd(bev);
    struct evbuffer *input = bufferevent_get_input(bev);
    int datalen = 0 ;

    size_t len1 = evbuffer_get_length(input);
    if(len1 < 4)
        return ;
    evbuffer_copyout(input, buflen, 4) ;
    buflen[4]=0x00 ;
    if( (buflen[0] < '0') || (buflen[0] > '9')  ||
        (buflen[1] < '0') || (buflen[1] > '9')  ||
        (buflen[2] < '0') || (buflen[2] > '9')  ||
        (buflen[3] < '0') || (buflen[3] > '9')   )
    {
        size_t lentmp = evbuffer_get_length(input);
        evbuffer_drain(input,lentmp) ;
        return ;
    }
    datalen = atoi(buflen) ;
    size_t len2 = evbuffer_get_length(input);
    if(len2 < (4+datalen) )
        return ;
    evbuffer_drain(input, 4);
    evbuffer_remove(input,record,datalen) ;
    record[datalen] = 0x00 ;
    //printf("(%d)(%s)\n",fd,record) ;
    DoStkidVec(fd,record) ;
}

void write_cb(struct bufferevent *bev, void *arg) {}

void error_cb(struct bufferevent *bev, short event, void *arg)
{
    evutil_socket_t fd = bufferevent_getfd(bev);
    printf("fd = %u, ", fd);
    if (event & BEV_EVENT_TIMEOUT) {
        printf("Timed out\n"); //if bufferevent_set_timeouts() called
    }
    else if (event & BEV_EVENT_EOF) {
        for(int i=0;i<clientarray[fd].v_old.size();i++)
        {
            int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
            if(iarr >= 0)
            {
                char str[128] ;
                sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
                //cout <<"iarr=(" << iarr << ")(" <<  str << ")" << endl ;
                queuedata q(str) ;
                lfqueue[iarr].queue_push(q) ;
            }
            //cout << clientarray[fd].v_old[i] << "removed" << endl ;
        } //for
        clientarray[fd].v_old.clear() ;
        clientarray[fd].v_new.clear() ;
        printf("connection closed\n");
    }
    else if (event & BEV_EVENT_ERROR) {
        printf("some other error\n");
    }
    bufferevent_free(bev);
}

void DoStkidVec(int fd,char *record)
{
    clientarray[fd].v_new.clear() ;
    char *pch ;
    pch = strtok(record,"@") ;
    while(pch != NULL)
    {
        //printf("(%s)\n",pch) ;
        string s(pch) ;
        clientarray[fd].v_new.push_back(s) ;
        pch = strtok(NULL,"@") ;
    } //while


    for(int i=0;i<clientarray[fd].v_new.size();i++)
    {
        vector<string>::iterator it;

        it = find(clientarray[fd].v_old.begin(),clientarray[fd].v_old.end(),clientarray[fd].v_new[i] ) ;
        if( it == clientarray[fd].v_old.end() )
        {
            int iarr = LocateLockFreeQueue(clientarray[fd].v_new[i]) ;
            if(iarr >= 0)
            {
                char str[128] ;
                sprintf(str,"%s %d %s","A",fd,clientarray[fd].v_new[i].c_str());
                //cout <<"iarr=(" << iarr << ")(" <<  str << ")" << endl ;
                queuedata q(str) ;
                lfqueue[iarr].queue_push(q) ;
            }
            //cout << clientarray[fd].v_new[i] << "added" << endl ;
        }
    } //for
    //cout << endl ;

    for(int i=0;i<clientarray[fd].v_old.size();i++)
    {
        vector<string>::iterator it;

        it = find(clientarray[fd].v_new.begin(),clientarray[fd].v_new.end(),clientarray[fd].v_old[i] ) ;
        if( it == clientarray[fd].v_new.end() )
        {
            int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
            if(iarr >= 0)
            {
                char str[128] ;
                sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
                //cout <<"iarr=(" << iarr << ")(" <<  str << ")" << endl ;
                queuedata q(str) ;
                lfqueue[iarr].queue_push(q) ;
            }
            //cout << clientarray[fd].v_old[i] << "removed" << endl ;
        }
    } //for
    //cout << "=================================" << endl ;

    clientarray[fd].v_old = clientarray[fd].v_new ;
}

=================================================

Client :

void *thread2(void *param)
{
    char strdata[81] ;
    int icnt = 0,nread=0 ;
    char stkid[11] ;
    int volume ;
    double bestbuyprc1,bestsellprc1 ;
    char strarr[4][20] ;
    for ( ; ; )
    {
        memset(strdata,0x00,sizeof(strdata) ) ;
        if ( (nread = recv(sd,strdata,31,MSG_NOSIGNAL)) <= 0)
        {
            break;
        }
        strdata[31]=0x00 ;
        //printf("recieve(%s)\n",strdata) ;
        memset(stkid,0x00,sizeof(stkid)) ;
        sscanf(strdata, "%[^'|']|%[^'|']|%[^'|']|%[^'|']|",
        &strarr[0],&strarr[1],&strarr[2],&strarr[3] ) ;
        //printf("(%s)(%s)(%s)(%s)\n",strarr[0],strarr[1],strarr[2],strarr[3]);
    }
}

void *thread1(void *param)
{
    char strx[128] ;
    int idx,inum = 0 ;
    while(1)
    {
        inum++ ;
        if( (inum%3) == 0){
            strcpy(strx,"0030") ;
            strcat(strx,"@0001@0002@0003")  ;
            strcat(strx,"@0004@0005@0006")  ;
        }else if( (inum%3) == 1){
            strcpy(strx,"0040") ;
            strcat(strx,"@0007@0002@0003")  ;
            strcat(strx,"@0004@0005@0008")  ;
            strcat(strx,"@0014@0015")  ;
        }else{
            strcpy(strx,"0045") ;
            strcat(strx,"@0001@0002@0003")  ;
            strcat(strx,"@0004@0005@0016")  ;
            strcat(strx,"@0019@0014@0017")  ;
        }
        printf("(%d)(%s)\n",strlen(strx),strx) ;
        if(send(sd,strx,strlen(strx),MSG_NOSIGNAL) <=0)
            exit(0) ;
        sleep(10) ;
    }
}

===============================================

compile :

g++ --std=c++0x datafeedserver.cpp -levent -lpthread  -o datafeedserver.exe
g++ --std=c++0x datafeedclient.cpp -levent -o datafeedclient.exe

 

arrow
arrow
    全站熱搜

    hedgezzz 發表在 痞客邦 留言(0) 人氣()