#include "mpmc.hpp"
#include "lightsem.hpp"
#include <semaphore.h>
 
using namespace std;
 
class MyThreadClass
{
public:
    MyThreadClass() {/* empty */}
    virtual ~MyThreadClass() {/* empty */}
 
    bool StartInternalThread()
    {
        return (pthread_create(&_thread, NULL, InternalThreadEntryFunc, this) == 0);
    }
 
    void WaitForInternalThreadToExit()
    {
        (void) pthread_join(_thread, NULL);
    }
 
protected:
    virtual void InternalThreadEntry() = 0;
 
private:
    static void * InternalThreadEntryFunc(void * This) {((MyThreadClass *)This)->InternalThreadEntry(); return NULL;}
 
    pthread_t _thread;
};
 
class Threadx:public MyThreadClass
{
public:
    void InternalThreadEntry(){
        while( 1 ){
            printf("hrllo world \n") ;
            sleep( 1 )  ;
        }
    }
} ;
 
class BufferedLog:public MyThreadClass
{
private:
    mpmc_bounded_queue<std::string>* mq;
 
    FILE   *logfp  ;
    pthread_t tid ;
    int    icnt ;
    LightweightSemaphore sem1 ;
    AutoResetEvent ev ;
    void InternalThreadEntry(){
        while(1){
            ev.wait() ;
            //sem1.wait() ;
            while( 1 ){
                std::string s ;
                if( mq->dequeue(s) ){
                    fprintf(logfp,"%s\n",s.c_str() ) ;
                }else{
                    break ;
                }
            } //while
            usleep(1) ;
        } //while
    }
public:
    BufferedLog(const char* fname,int clearflag){
        if(clearflag)
            logfp = fopen(fname,"a+");
        else
            logfp = fopen(fname,"w");
 
        if(!logfp){
            printf("fname(%s) is wrong to fopen \n", fname ) ;
            exit( 0 ) ;
        }
        mq = new mpmc_bounded_queue<std::string>(512) ;
    }
    void flush(){ fflush(logfp) ; }
    ~BufferedLog() {
        fclose(logfp) ;
    }
    bool enqueue(std::string s){
        bool bflag = mq->enqueue(s) ;
        if( bflag ){
            ev.signal() ;
            //sem1.signal() ;
        }
        return bflag ;
    }
} ;
 
int counter = 0 ;
BufferedLog blog("/home/informix/test/blog2.log",0) ;
 
int main(int argc, char* argv[])
{
    blog.StartInternalThread() ;
 
    void    *func1(void *);
    pthread_t tid[5] ;
    int       icnt[5] ;
    pthread_create(&tid[0], NULL, &func1 , (void *) icnt[0] );
    pthread_create(&tid[1], NULL, &func1 , (void *) icnt[1] );
    pthread_create(&tid[2], NULL, &func1 , (void *) icnt[2] );
    pthread_create(&tid[3], NULL, &func1 , (void *) icnt[3] );
    pthread_create(&tid[4], NULL, &func1 , (void *) icnt[4] );
 
    void *tret ;
    for(int idx=0;idx<5;idx++)
        pthread_join(tid[idx],&tret) ;
    printf("Done ........................................ \n") ;
    blog.flush() ;
    sleep( 500 ) ;
}
void    *func1(void * inum)
{
    while(1){
        int iret = __sync_add_and_fetch(&counter,1) ;
        if(iret > 10000000)
            break ;
        char ptr[11]={0} ;
        sprintf(ptr,"%08d",iret ) ;
        string s(ptr) ;
        while( 1 ){
            if( blog.enqueue(s) ){
                break ;
            }
            usleep(100) ;
            continue ;
        }
        usleep(1) ;
    } //while
    sleep( 1 ) ;
}
arrow
arrow
    全站熱搜

    hedgezzz 發表在 痞客邦 留言(0) 人氣()