#include #include #include #include #include #include #include #include "messages.h" #include #include #include #include double timediff( struct timeval const &start_time, struct timeval const &end_time ) { const long long micromill (1000*1000); const double dmicromill (static_cast(micromill)); struct timeval tdiff; timersub(&end_time,&start_time,&tdiff); return static_cast(tdiff.tv_sec)+(static_cast(tdiff.tv_usec)/dmicromill); } void* firstThread(void* outlet){ if(outlet){ membuf of; int fd(*static_cast(outlet)); struct timeval tv,tve; gettimeofday(&tv,nullptr); for(size_t c(0);c<1000;++c){ for(size_t cc(0);cc<1000;++cc){ testMessage t(double(c)+double(cc)/1000); t.serialize(of); write(fd,of.buffer(),of.size()); of.reset(); } } lastMessage eot; eot.serialize(of); write(fd,of.buffer(),of.size()); gettimeofday(&tve,nullptr); double *tdiff(new double(timediff(tv,tve))); pthread_exit(tdiff); } pthread_exit(nullptr); } void handleFds(struct pollfd fd,size_t &ct,size_t& c,bool& eot){ static char buf2[PIPE_BUF]; static membuf ii; if(fd.revents & POLLIN) { ++ct; ii.reset(); size_t n(read(fd.fd,buf2,sizeof(uint32_t))); if(n==sizeof(uint32_t)){ uint32_t l = (uint32_t)buf2[0] << 24 | (uint32_t)buf2[1] << 16 | (uint32_t)buf2[2] << 8 | (uint32_t)buf2[3]; if(l){ l-=n; n=read(fd.fd,buf2,l); if(n){ ii.set(buf2,buf2+n); std::unique_ptr pp(pipeMessage::deserialize(ii)); if(pp.get()){ ++c; switch(pp->id()){ case 2: eot= true; break; case 1: if(testMessage* ppp=dynamic_cast(pp.get())){ double d=ppp->data(); }else { std::cerr << "unable to cast message #" << pp->id() << std::endl; } break; default: std::cerr << "unknown message #" << pp->id() << std::endl; } }else std::cerr << "message cast error." << std::endl; } else std::cerr << "error while getting data" << std::endl; } }else std::cerr << "error while getting length" << std::endl; } } size_t c(0),ct(0); void progress(int){ std::cout << c << "/" << ct<< " messages handled for now." << std::endl; } void lostReader(int){ std::cerr << "pipe reader has ended." << std::endl; } int main(int argc, char* argv[]){ int nb_threads(2); if(argc>1){ nb_threads=atoi(argv[1]); } #ifdef HAS_SIGINFO if(signal(SIGINFO,progress)==SIG_ERR) perror("SIGINFO not supported."); #endif if(signal(SIGPIPE,lostReader)==SIG_ERR) perror("SIGPIPE not supported."); pthread_t *pid(new pthread_t[nb_threads]); struct pollfd* fds(new struct pollfd[nb_threads]); registerMessage(1,testMessage::create); registerMessage(2,lastMessage::create); int *fdp(new int[nb_threads*2]); for(size_t n(0);npipe2(&fdp[n*2],0)) { perror("pipe thread."); exit(-1); } pthread_create(&pid[n] ,nullptr,&firstThread,&fdp[n*2+1]); fds[n].fd=fdp[n*2]; fds[n].events= POLLIN; } struct timeval tv,tve; gettimeofday(&tv,nullptr); int quit(0); for(;quit(&r_thread)); if(r_thread){ std::cout << "time to write thread " << n <<":\t" << *r_thread << " sec."<< std::endl; delete r_thread; } close(fdp[n*2]); close(fdp[n*2+1]); } delete[] fdp; delete[] pid; delete[] fds; gettimeofday(&tve,nullptr); std::cout << "time to read " << c << "/" << ct<< " messages:" << timediff(tv,tve)<< " sec." << std::endl; }