#include "mpmc.hpp"

using namespace std;

class MyThreadClass
{
public:
    MyThreadClass() {/* empty */}
    virtual ~MyThreadClass() {/* empty */}

    bool StartInternalThread()
    {
        return (pthread_create(&_thread, NULL, InternalThreadEntryFunc, this) == 0);
    }

    void WaitForInternalThreadToExit()
    {
        (void) pthread_join(_thread, NULL);
    }

protected:
    virtual void InternalThreadEntry() = 0;

private:
    static void * InternalThreadEntryFunc(void * This) {((MyThreadClass *)This)->InternalThreadEntry(); return NULL;}

    pthread_t _thread;
};

class Threadx:public MyThreadClass
{
public:
    void InternalThreadEntry(){
        while( 1 ){
            printf("hrllo world \n") ;
            sleep( 1 )  ;
        }
    }
} ;

class BufferedLog:public MyThreadClass
{
private:
    mpmc_bounded_queue<std::string>* mq;

    FILE   *logfp  ;
    pthread_t tid ;
    int    icnt ;
    sem_t  sem1 ;
    void InternalThreadEntry(){
        while(1){
            sem_wait(&sem1)  ;
            while( 1 ){
                std::string s ;
                if( mq->dequeue(s) ){
                    fprintf(logfp,"%s\n",s.c_str() ) ;
                    break ;
                }
            } //while
            usleep(100) ;
        } //while
    }
public:
    BufferedLog(const char* fname,int clearflag){
        if(clearflag)
            logfp = fopen(fname,"a+");
        else
            logfp = fopen(fname,"w");

        if(!logfp){
            printf("fname(%s) is wrong to fopen \n", fname ) ;
            exit( 0 ) ;
        }
        mq = new mpmc_bounded_queue<std::string>(512) ;
        sem_init(&sem1,0,0) ;
    }
    ~BufferedLog() {
        fclose(logfp) ;
    }
    bool enqueue(std::string s){
        bool bflag = mq->enqueue(s) ;
        if( bflag )
            sem_post(&sem1) ;
        return bflag ;
    }
} ;

int counter = 0 ;
BufferedLog blog("/home/informix/test/blog2.log",0) ;

int main(int argc, char* argv[])
{
    blog.StartInternalThread() ;

    void    *func1(void *);
    pthread_t tid[5] ;
    int       icnt[5] ;
    pthread_create(&tid[0], NULL, &func1 , (void *) icnt[0] );
    pthread_create(&tid[1], NULL, &func1 , (void *) icnt[1] );
    pthread_create(&tid[2], NULL, &func1 , (void *) icnt[2] );
    pthread_create(&tid[3], NULL, &func1 , (void *) icnt[3] );
    pthread_create(&tid[4], NULL, &func1 , (void *) icnt[4] );

    while( 1 ){
        sleep( 10 ) ;
        if(counter > 100000000)
            break ;
    }
}
void    *func1(void * inum)
{
    while(1){
        int iret = __sync_add_and_fetch(&counter,1) ;
        if(iret > 10000000)
            break ;
        char ptr[11]={0} ;
        sprintf(ptr,"%010d",iret ) ;
        string s(ptr) ;
        while( 1 ){
            if( blog.enqueue(s) )
                break ;
            usleep(100) ;
            continue ;
        }
        usleep(1) ;
    } //while
}

           

arrow
arrow
    全站熱搜

    hedgezzz 發表在 痞客邦 留言(0) 人氣()