patron 10 months ago
parent
commit
057906e37d
9 changed files with 614 additions and 0 deletions
  1. 11 0
      pipeBomb/CMakeLists.txt
  2. 25 0
      pipeBomb/LICENSE
  3. 10 0
      pipeBomb/Makefile
  4. 188 0
      pipeBomb/main.cpp
  5. 122 0
      pipeBomb/messages.cpp
  6. 66 0
      pipeBomb/messages.h
  7. 25 0
      testTimer/LICENSE
  8. 7 0
      testTimer/Makefile
  9. 160 0
      testTimer/timerTest.c

+ 11 - 0
pipeBomb/CMakeLists.txt

@@ -0,0 +1,11 @@
+project (pipebomb)
+cmake_minimum_required(VERSION 3.7)
+set(CMAKE_CXX_STANDARD 11)
+#add_compile_options(-g)
+add_compile_options(-O2)
+find_package (Threads)
+include_directories( ${PROJECT_NAME} ${CMAKE_SYSTEM_INCLUDE_PATH})
+add_executable (${PROJECT_NAME} main.cpp
+    messages.cpp
+	)
+target_link_libraries(pipebomb Threads::Threads rt)

+ 25 - 0
pipeBomb/LICENSE

@@ -0,0 +1,25 @@
+BSD 2-Clause License
+
+Copyright (c) 2017, David Marec
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+  this list of conditions and the following disclaimer in the documentation
+  and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

+ 10 - 0
pipeBomb/Makefile

@@ -0,0 +1,10 @@
+PROG_CXX=   pipeBomb
+NO_OBJ=
+#MK_DEBUG_FILES= no
+#MK_ASSERT_DEBUG= no
+SRCS=   main.cpp messages.cpp
+LDADD= ${LIBPTHREAD}
+CXXFLAGS= -O2
+CXXFLAGS.main.cpp= -O2 -DHAS_SIGINFO
+MAN= 
+.include <bsd.prog.mk>

+ 188 - 0
pipeBomb/main.cpp

@@ -0,0 +1,188 @@
+#include <unistd.h>
+#include <iostream>                                                                                       
+#include <err.h>                                                                                          
+#include <pthread.h>
+#include <poll.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include "messages.h"
+#include <signal.h>
+#include <map>
+#include <limits.h>
+#include <memory>
+
+double timediff( 
+		struct timeval const &start_time,
+		struct timeval const &end_time
+		)
+{
+	const long long micromill (1000*1000);
+	const double dmicromill (static_cast<double>(micromill));
+	struct timeval tdiff;
+
+	timersub(&end_time,&start_time,&tdiff);
+	return static_cast<double>(tdiff.tv_sec)+(static_cast<double>(tdiff.tv_usec)/dmicromill);
+
+}
+
+void* firstThread(void* outlet){
+
+	if(outlet){
+		membuf of;
+		int fd(*static_cast<int*>(outlet));
+
+		struct timeval tv,tve;
+		gettimeofday(&tv,nullptr);
+
+		for(size_t c(0);c<1000;++c){
+			for(size_t cc(0);cc<1000;++cc){
+				testMessage t(double(c)+double(cc)/1000);
+				t.serialize(of);
+				write(fd,of.buffer(),of.size());
+				of.reset();
+			}
+		}
+
+		lastMessage eot;
+		eot.serialize(of);
+
+		write(fd,of.buffer(),of.size());
+		gettimeofday(&tve,nullptr);
+		double *tdiff(new double(timediff(tv,tve)));
+		pthread_exit(tdiff);
+	}
+	pthread_exit(nullptr);
+}
+
+void handleFds(struct pollfd fd,size_t &ct,size_t& c,bool& eot){
+
+	static char buf2[PIPE_BUF];
+	static membuf ii;
+
+	if(fd.revents & POLLIN)	{
+		++ct;
+		ii.reset();
+		size_t n(read(fd.fd,buf2,sizeof(uint32_t)));
+		if(n==sizeof(uint32_t)){
+			uint32_t l = (uint32_t)buf2[0] << 24 |
+				(uint32_t)buf2[1] << 16 |
+				(uint32_t)buf2[2] << 8  |
+				(uint32_t)buf2[3];
+			if(l){
+				l-=n;
+				n=read(fd.fd,buf2,l);
+				if(n){
+					ii.set(buf2,buf2+n);
+					std::unique_ptr<pipeMessage> pp(pipeMessage::deserialize(ii));
+					if(pp.get()){
+						++c;
+						switch(pp->id()){
+							case 2:
+								eot= true;
+								break;
+							case 1:
+							if(testMessage* ppp=dynamic_cast<testMessage*>(pp.get())){
+									double d=ppp->data();
+								}else {
+									std::cerr << "unable to cast message #" << pp->id() << std::endl;
+								}
+								break;
+							default: 
+								std::cerr << "unknown message #" << pp->id() << std::endl;
+						}
+					}else
+						std::cerr << "message cast error." << std::endl;
+				}
+				else
+					std::cerr << "error while getting data" << std::endl;
+			}
+		}else
+			std::cerr << "error while getting length" << std::endl;
+	}
+}
+size_t c(0),ct(0);
+void progress(int){
+	std::cout  <<  c << "/" << ct<< " messages handled for now."  << std::endl;
+}
+void lostReader(int){
+	std::cerr  << "pipe reader has ended." << std::endl;
+}
+
+int main(int argc, char* argv[]){
+
+	int nb_threads(2);
+	if(argc>1){
+		nb_threads=atoi(argv[1]);
+	}
+#ifdef HAS_SIGINFO
+	if(signal(SIGINFO,progress)==SIG_ERR)
+		perror("SIGINFO not supported.");
+#endif
+
+	if(signal(SIGPIPE,lostReader)==SIG_ERR)
+		perror("SIGPIPE not supported.");
+
+	pthread_t *pid(new pthread_t[nb_threads]);
+	struct pollfd* fds(new struct pollfd[nb_threads]);
+
+	registerMessage(1,testMessage::create);
+	registerMessage(2,lastMessage::create);
+
+	int *fdp(new int[nb_threads*2]);
+	for(size_t n(0);n<nb_threads;++n){
+		if(0>pipe2(&fdp[n*2],0))  {
+			perror("pipe thread.");
+			exit(-1);
+		}
+		pthread_create(&pid[n] ,nullptr,&firstThread,&fdp[n*2+1]);
+		fds[n].fd=fdp[n*2];
+		fds[n].events= POLLIN;
+	}
+
+	struct timeval tv,tve;
+	gettimeofday(&tv,nullptr);
+	int quit(0);
+
+	for(;quit<nb_threads;){
+		switch(poll(fds,nb_threads,-1)){
+			case 0:
+				std::cout << "timeout" << std::endl;
+				break;
+			case -1:
+				perror("Poll");
+				exit(-1);
+				break;
+			default:
+
+				for(size_t n(0);n<nb_threads;++n){
+					bool eot(false);
+					handleFds(fds[n],ct,c,eot);
+					if(eot){
+						std::cout << "Last Message from " << fds[n].fd << std::endl;
+						++quit;
+					}
+
+				}
+		}
+
+	}
+
+
+	for(size_t n(0);n<nb_threads;++n){
+		double *r_thread;
+		pthread_join(pid[n],reinterpret_cast<void**>(&r_thread));
+		if(r_thread){
+			std::cout << "time to write thread " << n <<":\t" << *r_thread << " sec."<< std::endl;
+			delete r_thread;
+		}
+		close(fdp[n*2]);
+		close(fdp[n*2+1]);
+	}
+	delete[] fdp;
+	delete[] pid;
+	delete[] fds;
+
+
+	gettimeofday(&tve,nullptr);
+	std::cout << "time to read " << c << "/" << ct<< " messages:" << timediff(tv,tve)<< " sec." << std::endl;
+}

+ 122 - 0
pipeBomb/messages.cpp

@@ -0,0 +1,122 @@
+#include <unistd.h>
+#include <map>
+#include "messages.h"
+
+membuf::membuf( size_t size){// may use size max
+	m_buffer.reserve(size);
+	reset();
+}
+
+void membuf::reset(){
+	m_cursor=sizeof m_cursor;
+	m_buffer.resize(m_cursor);
+}
+
+membuf::~membuf(){
+}
+
+void membuf::write(const char* c,size_t s){
+	size_t newsize(m_buffer.size()+s);
+	m_buffer.resize(newsize);
+	while(m_cursor<newsize){
+		m_buffer[m_cursor++]=*(c++);
+	}
+	m_buffer[0]= (m_cursor>> 24);
+	m_buffer[1]= (m_cursor>> 16);
+	m_buffer[2]= (m_cursor>> 8);
+	m_buffer[3]= (m_cursor);
+
+}
+void membuf::read(char*c ,size_t s){
+	size_t nc(m_cursor+s);
+	if(nc>m_buffer.size())
+		return;
+	while(m_cursor<nc){
+		*(c++)=m_buffer[m_cursor++];
+	}
+}
+
+const char* membuf::buffer() const{
+	return m_buffer.data();
+}
+
+size_t membuf::size() const {
+	return m_buffer.size();
+}
+
+void membuf::set(const char* i0, const char* i1) {
+	reset();
+	m_buffer.insert(m_buffer.begin()+sizeof m_cursor,i0,i1);
+}
+
+
+typedef pipeMessage* (*Factory)(membuf&);//avoid construct of first use issue 
+static std::map<int,Factory> pipeMessages;
+void registerMessage(int id, pipeMessage* (*Factory)(membuf&)){
+	pipeMessages[id]=Factory;
+}
+nullMessage::nullMessage():pipeMessage(){
+};
+nullMessage::~nullMessage(){
+};
+
+int nullMessage::id() const { return 0;}
+void nullMessage::serialize(membuf& out) const{
+}
+pipeMessage* nullMessage::create(membuf& in) { 
+	return new nullMessage();
+};
+
+
+pipeMessage* pipeMessage::deserialize(membuf& istr) {
+	int classId(0);
+	istr.read(reinterpret_cast<char*>(&classId),sizeof (int));
+	if(pipeMessages.count(classId)==0) {
+		return new nullMessage;
+	}
+	return pipeMessages[classId](istr);
+}
+
+pipeMessage::pipeMessage(){
+}
+
+pipeMessage::~pipeMessage(){
+}
+
+testMessage::testMessage(double d):pipeMessage(),m_data(d){
+};
+testMessage::~testMessage(){
+};
+pipeMessage* testMessage::create(membuf& in) { 
+	double d;
+	in.read(reinterpret_cast<char*>(&d), sizeof d);
+	return new testMessage(d);
+};
+
+int testMessage::id() const { return m_id;};
+void testMessage::serialize(membuf& out) const{
+	out.write(reinterpret_cast<const char*>(&m_id),sizeof (int));
+	out.write(reinterpret_cast<const char*>(&m_data),
+			sizeof m_data);
+}
+
+double testMessage::data() const{
+	return m_data;
+}
+
+lastMessage::lastMessage():pipeMessage(){
+};
+lastMessage::~lastMessage(){
+};
+pipeMessage* lastMessage::create(membuf& in) { 
+	return new lastMessage();
+};
+
+int lastMessage::id() const { return m_id;};
+void lastMessage::serialize(membuf& out) const{
+	out.write(reinterpret_cast<const char*>(&m_id),sizeof (int));
+}
+
+int testMessage::m_id=1;
+int lastMessage::m_id=2;
+

+ 66 - 0
pipeBomb/messages.h

@@ -0,0 +1,66 @@
+#include <stdint.h>
+#include <vector>
+
+class membuf;
+
+class membuf //to avoid streambuf from boost
+{
+	public:
+		membuf( size_t size=64 );
+		~membuf();
+		void write(const char* c,size_t s);
+		void read(char*c ,size_t s);
+		const char* buffer() const;
+		void set(const char* , const char*);
+		size_t size() const;
+		void reset();
+	private:
+		std::vector<char> m_buffer;
+		uint32_t m_cursor;
+};
+class pipeMessage{
+	public:
+
+		pipeMessage();
+		virtual ~pipeMessage();
+
+		virtual void serialize(membuf&) const=0;
+		static	pipeMessage* deserialize(membuf& istr);
+		virtual int id()const =0;
+};
+
+void registerMessage(int id, pipeMessage* (*Factory)(membuf&));
+class nullMessage:public pipeMessage{
+	public:
+		nullMessage();
+		~nullMessage();
+		static	pipeMessage* create(membuf& in);
+		int id() const;
+		void serialize(membuf& out) const;
+};
+
+
+class lastMessage:public pipeMessage{
+	public:
+		lastMessage();
+		~lastMessage();
+		static	pipeMessage* create(membuf& in);
+		int id() const;
+		void serialize(membuf& out) const;
+	private:
+		static int m_id; // may be automatically calculated on registering
+};
+
+class testMessage:public pipeMessage{
+	public:
+		testMessage(double d);
+		~testMessage();
+		static	pipeMessage* create(membuf& in);
+		int id() const;
+		void serialize(membuf& out) const;
+		double data() const;
+	private:
+		double m_data;
+		static int m_id; // may be automatically calculated on registering
+};
+

+ 25 - 0
testTimer/LICENSE

@@ -0,0 +1,25 @@
+BSD 2-Clause License
+
+Copyright (c) 2017, David Marec
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+  this list of conditions and the following disclaimer in the documentation
+  and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

+ 7 - 0
testTimer/Makefile

@@ -0,0 +1,7 @@
+PROG_CXX=   testTimer
+NO_OBJ=
+SRCS= timerTest.c
+LDADD= ${LIBRT}
+CFLAGS= -O2
+MAN= 
+.include <bsd.prog.mk>

+ 160 - 0
testTimer/timerTest.c

@@ -0,0 +1,160 @@
+#include <unistd.h>
+#include <time.h>
+#include <signal.h>
+#include <stdio.h>
+#include <sys/queue.h>
+#include <stdlib.h>
+#include <string.h>
+#include <err.h>
+
+/*
+ * signal and timer example
+ * 
+ * timerTest.c
+ */
+
+/*
+ * map timer code to a string
+ * 
+ */
+
+#define MAX_LEN_NAME 128
+
+SLIST_HEAD(slisthead, entry) head = SLIST_HEAD_INITIALIZER(head);
+struct slisthead *headp;	/* Singly-linked List head. */
+struct entry {
+	char   *name;
+	int		code;
+	SLIST_ENTRY   (entry) entries;	/* Singly-linked List. */
+} *np;
+
+/*
+ * signal handler
+ * 
+ */
+
+static int spankme;
+
+void
+sigHandler(int signo, siginfo_t * si, void * /* ucontext_t */ uap)
+{
+	switch (signo) {
+	case SIGALRM:
+		/* timer timeout */
+		SLIST_FOREACH(np, &head, entries) {
+			if (np && np->code == si->si_value.sival_int) {
+				printf("%s", np->name);
+				fflush(stdout);
+				break;
+			}
+		}
+		if (np == NULL)
+			printf("Timeout #%d, unknown.\n", si->si_value.sival_int);
+		break;
+	case SIGINFO:
+		printf("no need to panic.\n");
+		break;
+	case SIGINT:
+		/* exit flag */
+		++spankme;
+		break;
+	default:
+		printf("sig:%di received.", signo);
+		break;
+	}
+
+}
+
+int
+main(int argc, char *argv[])
+{
+	SLIST_INIT(&head);
+
+	/* timer pointers */
+	size_t nb_timers=0;
+	if(argc>1){
+		nb_timers=atoi(argv[1]);
+	}
+
+	if(nb_timers<1){
+		nb_timers=5;
+	}
+
+	timer_t		timerid[nb_timers];
+	char       *names[nb_timers];
+	
+
+	/* build text to display for each */
+	for(size_t i=0;i<nb_timers;++i){
+
+		names[i]=malloc(MAX_LEN_NAME);
+		if(i==0){
+			strncpy(names[i],".",MAX_LEN_NAME);
+			continue;
+		}
+		if(MAX_LEN_NAME<=snprintf(names[i],MAX_LEN_NAME,"+%zu+",i*2+1)){
+			warnx("string %zu shorten",i);
+		}
+	}
+
+	/* signal */
+	struct sigaction sa;
+	sa.sa_handler = NULL;
+	sa.sa_sigaction = &sigHandler;
+	sa.sa_flags = SA_SIGINFO;
+	sigemptyset(&sa.sa_mask);
+	if (sigaction(SIGINFO, &sa, NULL)) {
+		warn("SIGNFO not caught.");
+	}
+	if (sigaction(SIGALRM, &sa, NULL)) {
+		err(2, "sigaction:ALARM");
+	}
+	if (sigaction(SIGINT, &sa, NULL)) {
+		err(2, "sigaction:INTERRUPT");
+	}
+
+	/* timer */
+	for (size_t i = 0; i < nb_timers; ++i) {
+		if (timer_create(CLOCK_REALTIME, NULL, &timerid[i])) {
+			warn("timer_create failed for %zu", i);
+			continue;
+		}
+		/* 3 -> TIMER_MAX stored into timerid->oshandle */
+		printf("Timer #%d now created at address %p.\n", timer_oshandle_np(timerid[i]),timerid[i]);
+		np = malloc(sizeof(struct entry));	/* Insert at the head. */
+		np->code = timer_oshandle_np(timerid[i]);
+		np->name=names[i];
+
+		SLIST_INSERT_HEAD(&head, np, entries);
+
+		/* Set values and start timer */
+		struct itimerspec its;
+		its.it_interval.tv_sec = 1 + i * 2;
+		its.it_interval.tv_nsec = 0;
+		its.it_value.tv_sec = its.it_interval.tv_sec;
+		its.it_value.tv_nsec = 0;
+
+		if (timer_settime(timerid[i], 0, &its, NULL)) {
+			warn("timer_settime failed for %zu", i);
+		}
+	}
+
+	printf("waiting...\n");
+	for (; !spankme;) {
+		pause();
+	} //loop
+
+	printf("\nbye.\n");
+	for (size_t i = 0; i < nb_timers; ++i) {
+		timer_delete(timerid[i]);
+	}
+
+	while (!SLIST_EMPTY(&head)) {
+		np = SLIST_FIRST(&head);
+		SLIST_REMOVE_HEAD(&head, entries);
+		free(np->name);
+		free(np);
+	}
+
+
+}