#include <iostream>
#include <vector>
#include <map>
#include <string>
#include <algorithm>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <memory.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <semaphore.h>
#include <netinet/tcp.h>
#include <sys/un.h>
#include <pthread.h>
#include <sys/time.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/event-config.h>
#include <stdarg.h>
#include <event2/util.h>
#include <event2/buffer.h>
#include "../DoubleWordCas/lockfreequeue.hpp"
using namespace std ;
#define LISTEN_PORT 4444
#define LISTEN_BACKLOG 32
void do_accept(evutil_socket_t listener, short event, void *arg);
void read_cb(struct bufferevent *bev, void *arg);
void error_cb(struct bufferevent *bev, short event, void *arg);
void write_cb(struct bufferevent *bev, void *arg);
void DoStkidVec(int,char *) ;
#define MAXCLIENT 1024
struct stkidvec_
{
vector<string> v_old ;
vector<string> v_new ;
} ;
typedef struct stkidvec_ stkidvec ;
stkidvec clientarray[MAXCLIENT] ;
class queuedata
{
public:
queuedata()
{
strcpy(qdata,"") ;
}
queuedata(char* s)
{
strcpy(qdata,s) ;
}
const char* getdata() const { return qdata ; }
queuedata& operator=(const queuedata& q) //copy assignment
{
strcpy(qdata,q.getdata()) ;
return *this ;
}
private :
char qdata[128] ;
} ;
#define MAXSTKID 20
LockFreeQueue<queuedata> lfqueue[MAXSTKID] ;
map<int,int> StkidMap[MAXSTKID] ;
struct datafeed_
{
char stkid[11] ;
double bestbuyprc1 ;
double bestsellprc1 ;
int volume ;
} ;
typedef struct datafeed_ datafeed ;
datafeed DataFeed[MAXSTKID] ;
void InitDataFeed(void)
{
int idx,idy,idz ;
char strtmp[128] ;
for(idx=0;idx<MAXSTKID;idx++)
{
sprintf(strtmp,"00%02d",idx) ;
strtmp[4]=0x00 ;
strcpy(DataFeed[idx].stkid,strtmp) ;
DataFeed[idx].bestbuyprc1 = (idx + 1) * 0.1 ;
DataFeed[idx].bestsellprc1 = (idx + 1) * 0.15 ;
DataFeed[idx].volume = (idx + 1) * 1000 ;
} //for
}
int LocateLockFreeQueue(const string& stkid)
{
int istkid ;
istkid = atoi(stkid.c_str()) ;
if( (istkid >= MAXSTKID) || (istkid < 0) )
return -1 ;
return istkid ;
}
void *funcqueue(void *arg)
{
int threadid = (int)(long) arg;
int idx,idy,idz ;
pthread_detach(pthread_self());
while(1)
{
for(idx=0;idx<MAXSTKID;idx++)
{
queuedata q ;
char action[8],stkid[18] ;
int fd ;
while(lfqueue[idx].queue_pop(q) > 0)
{
const char *ptr = q.getdata() ;
sscanf(ptr,"%s %d %s",action,&fd,stkid) ;
//printf("(%s)(%d)(%s) poped in (%d)\n",action,fd,stkid,idx ) ;
if(strncmp(action,"A",1)==0)
{
StkidMap[idx].insert(pair<int,int>(fd,fd) ) ;
}else if(strncmp(action,"D",1)==0){
StkidMap[idx].erase(fd) ;
}
} //while
std::map<int,int>::iterator it ;
for (it=StkidMap[idx].begin(); it!=StkidMap[idx].end(); ++it)
{
//printf("To (%d) send out idx=(%d)\n",it->second,idx);
char data[128] ;
sprintf(data,"%10s|%05.2f|%05.2f|%07d|",DataFeed[idx].stkid,
DataFeed[idx].bestbuyprc1,DataFeed[idx].bestsellprc1,
DataFeed[idx].volume ) ;
//printf("To (%d) send out(%s)\n",it->second,data) ;
send(it->second,data,strlen(data),MSG_NOSIGNAL) ;
}
//printf("====================================== \n") ;
} // for
usleep(1000) ;
} //while
} //funcqueue
int main(int argc, char *argv[])
{
int ret;
evutil_socket_t listener;
listener = socket(AF_INET, SOCK_STREAM, 0);
assert(listener > 0);
evutil_make_listen_socket_reuseable(listener);
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(LISTEN_PORT);
int idx,idy,idz ;
InitDataFeed() ;
pthread_t tqueue ;
int iCPU = 0 ;
pthread_create(&tqueue,NULL,funcqueue,(void *)(long)iCPU );
if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
perror("bind");
return 1;
}
if (listen(listener, LISTEN_BACKLOG) < 0) {
perror("listen");
return 1;
}
printf ("Listening...\n");
evutil_make_socket_nonblocking(listener);
struct event_base *base = event_base_new();
assert(base != NULL);
struct event *listen_event;
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
event_add(listen_event, NULL);
event_base_dispatch(base);
printf("The End.");
return 0;
}
void do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = (struct event_base *)arg;
evutil_socket_t fd;
struct sockaddr_in sin;
socklen_t slen;
fd = accept(listener, (struct sockaddr *)&sin, &slen);
if (fd < 0) {
perror("accept");
return;
}
if (fd > FD_SETSIZE) {
perror("fd > FD_SETSIZE\n");
return;
}
printf("ACCEPT: fd = %u\n", fd);
int idx,idy,idz ;
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
bufferevent_enable(bev, EV_READ|EV_PERSIST);
}
void read_cb(struct bufferevent *bev, void *arg)
{
evutil_socket_t fd = bufferevent_getfd(bev);
struct evbuffer *input = bufferevent_get_input(bev);
while( 1 ) {
int datalen = 0 ;
char buflen[5]={0},record[8192]={0} ;
size_t len1 = evbuffer_get_length(input);
if(len1 < 4)
return ;
evbuffer_copyout(input, buflen, 4) ;
try{
string s(buflen) ;
datalen = stoi(s) ;
}catch(std::exception const &e){
std::cout << "error:" << e.what() << std::endl ;
return ;
}
size_t len2 = evbuffer_get_length(input);
if(len2 < (4+datalen) )
return ;
evbuffer_drain(input, 4);
evbuffer_remove(input,record,datalen) ;
record[datalen] = 0x00 ;
//fprintf(logfp,"(%d)receive(%s)\n",fd,record) ;
printf("(%d)receive(%s)\n",fd,record) ;
DoStkidVec(fd,record) ;
} //while
return ;
}
void write_cb(struct bufferevent *bev, void *arg) {}
void error_cb(struct bufferevent *bev, short event, void *arg)
{
evutil_socket_t fd = bufferevent_getfd(bev);
printf("fd = %u, ", fd);
if (event & BEV_EVENT_TIMEOUT) {
printf("Timed out\n"); //if bufferevent_set_timeouts() called
}
else if (event & BEV_EVENT_EOF) {
for(int i=0;i<clientarray[fd].v_old.size();i++)
{
int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
if(iarr >= 0)
{
char str[128] ;
sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
//cout <<"iarr=(" << iarr << ")(" << str << ")" << endl ;
queuedata q(str) ;
lfqueue[iarr].queue_push(q) ;
}
//cout << clientarray[fd].v_old[i] << "removed" << endl ;
} //for
clientarray[fd].v_old.clear() ;
clientarray[fd].v_new.clear() ;
printf("connection closed\n");
}
else if (event & BEV_EVENT_ERROR) {
printf("some other error\n");
}
bufferevent_free(bev);
}
void DoStkidVec(int fd,char *record)
{
clientarray[fd].v_new.clear() ;
char *pch ;
pch = strtok(record,"@") ;
while(pch != NULL)
{
//printf("(%s)\n",pch) ;
string s(pch) ;
clientarray[fd].v_new.push_back(s) ;
pch = strtok(NULL,"@") ;
} //while
for(int i=0;i<clientarray[fd].v_new.size();i++)
{
vector<string>::iterator it;
it = find(clientarray[fd].v_old.begin(),clientarray[fd].v_old.end(),clientarray[fd].v_new[i] ) ;
if( it == clientarray[fd].v_old.end() )
{
int iarr = LocateLockFreeQueue(clientarray[fd].v_new[i]) ;
if(iarr >= 0)
{
char str[128] ;
sprintf(str,"%s %d %s","A",fd,clientarray[fd].v_new[i].c_str());
//cout <<"iarr=(" << iarr << ")(" << str << ")" << endl ;
queuedata q(str) ;
lfqueue[iarr].queue_push(q) ;
}
//cout << clientarray[fd].v_new[i] << "added" << endl ;
}
} //for
//cout << endl ;
for(int i=0;i<clientarray[fd].v_old.size();i++)
{
vector<string>::iterator it;
it = find(clientarray[fd].v_new.begin(),clientarray[fd].v_new.end(),clientarray[fd].v_old[i] ) ;
if( it == clientarray[fd].v_new.end() )
{
int iarr = LocateLockFreeQueue(clientarray[fd].v_old[i]) ;
if(iarr >= 0)
{
char str[128] ;
sprintf(str,"%s %d %s","D",fd,clientarray[fd].v_old[i].c_str());
//cout <<"iarr=(" << iarr << ")(" << str << ")" << endl ;
queuedata q(str) ;
lfqueue[iarr].queue_push(q) ;
}
//cout << clientarray[fd].v_old[i] << "removed" << endl ;
}
} //for
//cout << "=================================" << endl ;
clientarray[fd].v_old = clientarray[fd].v_new ;
}
- Sep 16 Wed 2015 16:17
libevent server demo
close
全站熱搜
留言列表
發表留言