close
#include "mpmc.hpp"
#include "lightsem.hpp"
#include <semaphore.h>
using
namespace
std;
class
MyThreadClass
{
public
:
MyThreadClass() {
/* empty */
}
virtual
~MyThreadClass() {
/* empty */
}
bool
StartInternalThread()
{
return
(pthread_create(&_thread, NULL, InternalThreadEntryFunc,
this
) == 0);
}
void
WaitForInternalThreadToExit()
{
(
void
) pthread_join(_thread, NULL);
}
protected
:
virtual
void
InternalThreadEntry() = 0;
private
:
static
void
* InternalThreadEntryFunc(
void
* This) {((MyThreadClass *)This)->InternalThreadEntry();
return
NULL;}
pthread_t _thread;
};
class
Threadx:
public
MyThreadClass
{
public
:
void
InternalThreadEntry(){
while
( 1 ){
printf
(
"hrllo world \n"
) ;
sleep( 1 ) ;
}
}
} ;
class
BufferedLog:
public
MyThreadClass
{
private
:
mpmc_bounded_queue<std::string>* mq;
FILE
*logfp ;
pthread_t tid ;
int
icnt ;
LightweightSemaphore sem1 ;
AutoResetEvent ev ;
void
InternalThreadEntry(){
while
(1){
ev.wait() ;
//sem1.wait() ;
while
( 1 ){
std::string s ;
if
( mq->dequeue(s) ){
fprintf
(logfp,
"%s\n"
,s.c_str() ) ;
}
else
{
break
;
}
}
//while
usleep(1) ;
}
//while
}
public
:
BufferedLog(
const
char
* fname,
int
clearflag){
if
(clearflag)
logfp =
fopen
(fname,
"a+"
);
else
logfp =
fopen
(fname,
"w"
);
if
(!logfp){
printf
(
"fname(%s) is wrong to fopen \n"
, fname ) ;
exit
( 0 ) ;
}
mq =
new
mpmc_bounded_queue<std::string>(512) ;
}
void
flush(){
fflush
(logfp) ; }
~BufferedLog() {
fclose
(logfp) ;
}
bool
enqueue(std::string s){
bool
bflag = mq->enqueue(s) ;
if
( bflag ){
ev.
signal
() ;
//sem1.signal() ;
}
return
bflag ;
}
} ;
int
counter = 0 ;
BufferedLog blog(
"/home/informix/test/blog2.log"
,0) ;
int
main(
int
argc,
char
* argv[])
{
blog.StartInternalThread() ;
void
*func1(
void
*);
pthread_t tid[5] ;
int
icnt[5] ;
pthread_create(&tid[0], NULL, &func1 , (
void
*) icnt[0] );
pthread_create(&tid[1], NULL, &func1 , (
void
*) icnt[1] );
pthread_create(&tid[2], NULL, &func1 , (
void
*) icnt[2] );
pthread_create(&tid[3], NULL, &func1 , (
void
*) icnt[3] );
pthread_create(&tid[4], NULL, &func1 , (
void
*) icnt[4] );
void
*tret ;
for
(
int
idx=0;idx<5;idx++)
pthread_join(tid[idx],&tret) ;
printf
(
"Done ........................................ \n"
) ;
blog.flush() ;
sleep( 500 ) ;
}
void
*func1(
void
* inum)
{
while
(1){
int
iret = __sync_add_and_fetch(&counter,1) ;
if
(iret > 10000000)
break
;
char
ptr[11]={0} ;
sprintf
(ptr,
"%08d"
,iret ) ;
string s(ptr) ;
while
( 1 ){
if
( blog.enqueue(s) ){
break
;
}
usleep(100) ;
continue
;
}
usleep(1) ;
}
//while
sleep( 1 ) ;
}
全站熱搜
留言列表