#include <iostream>
#include <vector>
#include <map>
#include <string>
#include <algorithm>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <memory.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <semaphore.h>
#include <netinet/tcp.h>
#include <sys/un.h>
#include <pthread.h>
#include <sys/time.h>


#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/event-config.h>       
#include <stdarg.h>       
#include <event2/util.h>
#include <event2/buffer.h>
#include "../DoubleWordCas/lockfreequeue.hpp"

using namespace std ;

#define LISTEN_PORT 4444
#define LISTEN_BACKLOG 32

void do_accept(evutil_socket_t listener, short event, void *arg);
void read_cb(struct bufferevent *bev, void *arg);
void error_cb(struct bufferevent *bev, short event, void *arg);
void write_cb(struct bufferevent *bev, void *arg);
void DoStkidVec(int,char *) ;


#define MAXCLIENT 1024

struct stkidvec_
{
    vector<string> v_old ;
    vector<string> v_new ;
} ;
typedef struct stkidvec_ stkidvec ;
stkidvec clientarray[MAXCLIENT] ;

class queuedata
{
public:
    queuedata()
    {
        strcpy(qdata,"") ;
    }
    queuedata(char* s)
    {
        strcpy(qdata,s) ;
    }
    const char* getdata() const { return qdata ; }
    queuedata& operator=(const queuedata& q) //copy assignment
    {
        strcpy(qdata,q.getdata()) ;
        return *this ;
    }
private :
    char qdata[128] ;
} ;



#define MAXSTKID 20
LockFreeQueue<queuedata> lfqueue[MAXSTKID] ;
map<int,int> StkidMap[MAXSTKID] ;

struct datafeed_
{
    char stkid[11] ;
    double bestbuyprc1 ;
    double bestsellprc1 ;
    int  volume ;
} ;
typedef struct datafeed_ datafeed ;
datafeed DataFeed[MAXSTKID] ;

void InitDataFeed(void)
{
    int idx,idy,idz ;
    char strtmp[128] ;
    for(idx=0;idx<MAXSTKID;idx++)
    {
        sprintf(strtmp,"00%02d",idx) ;
        strtmp[4]=0x00 ;
        strcpy(DataFeed[idx].stkid,strtmp) ;
        DataFeed[idx].bestbuyprc1 = (idx + 1) * 0.1 ;
        DataFeed[idx].bestsellprc1 = (idx + 1) * 0.15 ;
        DataFeed[idx].volume = (idx + 1) * 1000 ;
    } //for
}

int LocateLockFreeQueue(const string& stkid)
{
    int istkid ;
    istkid = atoi(stkid.c_str()) ;
    if( (istkid >= MAXSTKID) || (istkid < 0) )
        return -1 ;
    return istkid ;
}

void *funcqueue(void *arg)
{
    int threadid = (int)(long) arg;
    int idx,idy,idz ;
    pthread_detach(pthread_self());
    while(1)
    {
        for(idx=0;idx<MAXSTKID;idx++)
        {
            queuedata q ;
            char action[8],stkid[18] ;
            int fd ;
            while(lfqueue[idx].queue_pop(q) > 0)
            {
                const char *ptr = q.getdata() ;
                sscanf(ptr,"%s %d %s",action,&fd,stkid) ;
                //printf("(%s)(%d)(%s) poped in (%d)\n",action,fd,stkid,idx ) ;
                if(strncmp(action,"A",1)==0)
                {
                    StkidMap[idx].insert(pair<int,int>(fd,fd) ) ;
                }else if(strncmp(action,"D",1)==0){
                    StkidMap[idx].erase(fd) ;
                }
            } //while
            std::map<int,int>::iterator it ;
            for (it=StkidMap[idx].begin(); it!=StkidMap[idx].end(); ++it)
            {
                //printf("To (%d) send out  idx=(%d)\n",it->second,idx);
                char data[128] ;
                sprintf(data,"%10s|%05.2f|%05.2f|%07d|",DataFeed[idx].stkid,
                DataFeed[idx].bestbuyprc1,DataFeed[idx].bestsellprc1,
                DataFeed[idx].volume ) ;
                //printf("To (%d) send out(%s)\n",it->second,data) ;
                send(it->second,data,strlen(data),MSG_NOSIGNAL) ;
            }
            //printf("====================================== \n") ;
        } // for
        usleep(1000) ;
    } //while
} //funcqueue


int main(int argc, char *argv[])
{
    int ret;
    evutil_socket_t listener;
    listener = socket(AF_INET, SOCK_STREAM, 0);
    assert(listener > 0);
    evutil_make_listen_socket_reuseable(listener);

    struct sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(LISTEN_PORT);

    int idx,idy,idz ;

    InitDataFeed() ;

    pthread_t tqueue ;
    int iCPU = 0 ;
    pthread_create(&tqueue,NULL,funcqueue,(void *)(long)iCPU );

    if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return 1;
    }

    if (listen(listener, LISTEN_BACKLOG) < 0) {
        perror("listen");
        return 1;
    }

    printf ("Listening...\n");

    evutil_make_socket_nonblocking(listener);

    struct event_base *base = event_base_new();
    assert(base != NULL);
    struct event *listen_event;
    listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    event_add(listen_event, NULL);
    event_base_dispatch(base);

    printf("The End.");
    return 0;
}

void do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = (struct event_base *)arg;
    evutil_socket_t fd;
    struct sockaddr_in sin;
    socklen_t slen;
    fd = accept(listener, (struct sockaddr *)&sin, &slen);
    if (fd < 0) {
        perror("accept");
        return;
    }
    if (fd > FD_SETSIZE) {
        perror("fd > FD_SETSIZE\n");
        return;
    }

    printf("ACCEPT: fd = %u\n", fd);

    int idx,idy,idz  ;

    struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
    bufferevent_enable(bev, EV_READ|EV_PERSIST);
}

void read_cb(struct bufferevent *bev, void *arg)
{
    evutil_socket_t fd = bufferevent_getfd(bev);
    struct evbuffer *input = bufferevent_get_input(bev);

    while( 1 ) {
        int datalen = 0 ;
        char buflen[5]={0},record[8192]={0} ;

        size_t len1 = evbuffer_get_length(input);
        if(len1 < 4)
            return ;
        evbuffer_copyout(input, buflen, 4) ;

        try{
            string s(buflen) ;
            datalen = stoi(s) ;
        }catch(std::exception const &e){
            std::cout << "error:" << e.what() << std::endl ;
            return ;
        }

        size_t len2 = evbuffer_get_length(input);
        if(len2 < (4+datalen) )
            return ;
        evbuffer_drain(input, 4);
        evbuffer_remove(input,record,datalen) ;
        record[datalen] = 0x00 ;
        //fprintf(logfp,"(%d)receive(%s)\n",fd,record) ;
        printf("(%d)receive(%s)\n",fd,record) ;
        DoStkidVec(fd,record) ;
    } //while
    return ;
}

void write_cb(struct bufferevent *bev, void *arg) {}

void error_cb(struct bufferevent *bev, short event, void *arg)
{
    evutil_socket_t fd = bufferevent_getfd(bev);
    printf("fd = %u, ", fd);
    if (event & BEV_EVENT_TIMEOUT) {
        printf("Timed out\n"); //if bufferevent_set_timeouts() called
    }
    else if (event & BEV_EVENT_EOF) {
        for(int i=0;i<clientarray[fd].v_old.size();i++)
        {
            int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
            if(iarr >= 0)
            {
                char str[128] ;
                sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
                //cout <<"iarr=(" << iarr << ")(" <<  str << ")" << endl ;
                queuedata q(str) ;
                lfqueue[iarr].queue_push(q) ;
            }
            //cout << clientarray[fd].v_old[i] << "removed" << endl ;
        } //for
        clientarray[fd].v_old.clear() ;
        clientarray[fd].v_new.clear() ;
        printf("connection closed\n");
    }
    else if (event & BEV_EVENT_ERROR) {
        printf("some other error\n");
    }
    bufferevent_free(bev);
}


void DoStkidVec(int fd,char *record)
{
    clientarray[fd].v_new.clear() ;
    char *pch ;
    pch = strtok(record,"@") ;
    while(pch != NULL)
    {
        //printf("(%s)\n",pch) ;
        string s(pch) ;
        clientarray[fd].v_new.push_back(s) ;
        pch = strtok(NULL,"@") ;
    } //while


    for(int i=0;i<clientarray[fd].v_new.size();i++)
    {
        vector<string>::iterator it;

        it = find(clientarray[fd].v_old.begin(),clientarray[fd].v_old.end(),clientarray[fd].v_new[i] ) ;
        if( it == clientarray[fd].v_old.end() )
        {
            int iarr = LocateLockFreeQueue(clientarray[fd].v_new[i]) ;
            if(iarr >= 0)
            {
                char str[128] ;
                sprintf(str,"%s %d %s","A",fd,clientarray[fd].v_new[i].c_str());
                //cout <<"iarr=(" << iarr << ")(" <<  str << ")" << endl ;
                queuedata q(str) ;
                lfqueue[iarr].queue_push(q) ;
            }
            //cout << clientarray[fd].v_new[i] << "added" << endl ;
        }
    } //for
    //cout << endl ;

    for(int i=0;i<clientarray[fd].v_old.size();i++)
    {
        vector<string>::iterator it;

        it = find(clientarray[fd].v_new.begin(),clientarray[fd].v_new.end(),clientarray[fd].v_old[i] ) ;
        if( it == clientarray[fd].v_new.end() )
        {
            int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
            if(iarr >= 0)
            {
                char str[128] ;
                sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
                //cout <<"iarr=(" << iarr << ")(" <<  str << ")" << endl ;
                queuedata q(str) ;
                lfqueue[iarr].queue_push(q) ;
            }
            //cout << clientarray[fd].v_old[i] << "removed" << endl ;
        }
    } //for
    //cout << "=================================" << endl ;

    clientarray[fd].v_old = clientarray[fd].v_new ;
}

創作者介紹
創作者 hedgezzz 的頭像
hedgezzz

hedgezzz的部落格

hedgezzz 發表在 痞客邦 留言(0) 人氣()