/*
 * How to stress epoll
 *
 * This program uses many pipes|sockets and two threads.
 * First we open as many pipes|sockets we can. (see ulimit -n)
 * Then we create a worker thread.
 * The worker thread will send bytes to random streams.
 * The main thread uses epoll to collect ready events and clear them, reading streams.
 * Each second, a number of collected events is printed on stderr
 * After one minute, program prints an average value and stops.
 *
 * Usage : epoll_bench [-f] [-{u|i}] [-n X]
 *   -f : No epoll loop, just feed streams in a cyclic manner
 *   -u : Use AF_UNIX sockets (instead of pipes)
 *   -i : Use AF_INET sockets
 */
#include <pthread.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <signal.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
# include <netinet/in.h>
#include <fcntl.h>
#include <sys/ioctl.h>

int nbhandles = 1024;
int time_test = 15;
unsigned long nbhandled;
unsigned long epw_samples;
unsigned long epw_samples_cnt;

struct pipefd {
	int fd[2];
} *tab;

int epoll_fd;
int fflag;
int afunix;
int afinet;

static int alloc_streams()
{
	int i;
	int listen_sock;
	struct sockaddr_in me, to;
	socklen_t namelen;
    int on = 1;
	int off = 0;

	if (!fflag) {
		epoll_fd = epoll_create(nbhandles);
		if (epoll_fd == -1) {
			perror("epoll_create");
			return -1;
		}
	}
	tab = malloc(sizeof(struct pipefd) * nbhandles);
	if (tab == NULL) {
		perror("malloc");
		return -1;
	}
	if (afinet) {
		listen_sock = socket(AF_INET, SOCK_STREAM, 0);
		if (listen_sock == -1) {
			perror("socket");
			return -1;
		}
		if (listen(listen_sock, 256) == -1) {
			perror("listen");
			return -1;
		}
		namelen = sizeof(me);
		getsockname(listen_sock, (struct sockaddr *)&me, &namelen);
	}
	for (i = 0 ; i < nbhandles ; i++) {
		if (afinet) {
			tab[i].fd[0] = socket(AF_INET, SOCK_STREAM, 0);
			if (tab[i].fd[0] == -1)
				break;
			to = me;
			ioctl(tab[i].fd[0], FIONBIO, &on);
			if (connect(tab[i].fd[0], (struct sockaddr *)&to, sizeof(to)) != -1 || errno != EINPROGRESS)
				break;
			tab[i].fd[1] = accept(listen_sock, (struct sockaddr *)&to, &namelen);
			if (tab[i].fd[1] == -1)
				break;
			ioctl(tab[i].fd[0], FIONBIO, &off);
#if 0
			fcntl(tab[i].fd[0], F_SETFL, O_NONBLOCK);
			fcntl(tab[i].fd[1], F_SETFL, O_NONBLOCK);
#endif
		}
		else if (afunix) {
			if (socketpair(AF_UNIX, SOCK_STREAM, 0, tab[i].fd) == -1)
				break;
		} else {
			if (pipe(tab[i].fd) == -1)
				break;
		}
		if (!fflag) {
			struct epoll_event ev;
			ev.events = EPOLLIN;
			ev.data.u64 = (uint64_t)i;
			epoll_ctl(epoll_fd, EPOLL_CTL_ADD, tab[i].fd[0], &ev);
		}
	}
	nbhandles = i;
	printf("%d handles setup\n", nbhandles);
	return 0;
}

int sample_proc_stat(long *ctxt)
{
	int fd = open("/proc/stat", 0);
	char buffer[4096+1], *p;
	int lu;
	*ctxt = 0;
	if (fd == -1) {
		perror("/proc/stat");
		return -1;
	}
	lu = read(fd, buffer, sizeof(buffer));
	close(fd);
	if (lu < 10)
		return -1;
	buffer[lu] = 0;
	p = strstr(buffer, "ctxt");
	if (p)
		*ctxt = atol(p + 4);
	return 0;
}


static void timer_func()
{
	char buffer[128];
	size_t len;
	static unsigned long old;
	static unsigned long oldctxt=0;
	unsigned long ctxt;
	unsigned long delta = nbhandled - old;
	static int alarm_events = 0;

	old = nbhandled;
	len = sprintf(buffer, "%lu evts/sec", delta);
	sample_proc_stat(&ctxt);
	delta = ctxt - oldctxt;
	if (delta && oldctxt)
		len += sprintf(buffer + len, " %lu ctxt/sec", delta);
	oldctxt = ctxt;
	if (epw_samples)
		len += sprintf(buffer + len, " %g samples per call", (double)epw_samples_cnt/(double)epw_samples);
	buffer[len++] = '\n';
	write(2, buffer, len);
	if (++alarm_events >= time_test) {
		delta = nbhandled/alarm_events;
		len = sprintf(buffer, "Avg: %lu evts/sec\n", delta);
		write(2, buffer, len);
		exit(0);
	}
}

static void timer_setup()
{
	struct itimerval it;
	struct sigaction sg;

	memset(&sg, 0, sizeof(sg));
	sg.sa_handler = timer_func;
	sigaction(SIGALRM, &sg, 0);
	it.it_interval.tv_sec = 1;
	it.it_interval.tv_usec = 0;
	it.it_value.tv_sec = 1;
	it.it_value.tv_usec = 0;
	if (setitimer(ITIMER_REAL, &it, 0))
		perror("setitimer");
}

static void * worker_thread_func(void *arg)
{
	int fd = -1;
	char c = 1;
	int cnt = 0;
	nice(10);
	for (;;) {
		fd = (fd + 1) % nbhandles;
		write(tab[fd].fd[1], &c, 1);
		if (++cnt >= nbhandles) {
			cnt = 0 ;
			pthread_yield(); /* relax :) */
			}
	}
}

void usage(int code)
{
	fprintf(stderr, "Usage : epoll_bench [-n num] [-{u|i}] [-f] [-t duration] [-l limit] [-e maxepoll]\n");
	exit(code);
}

static int epoll_modify(struct epoll_event *ev, int fd, uint64_t priv)
{
	ev->events = EPOLLIN;
	ev->data.u64 = priv;
	epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, ev);
}

int main(int argc, char *argv[])
{
	char buff[1024];
	pthread_t tid;
	int c, fd;
	int limit = 1000;
	int max_epoll = 1024;

	while ((c = getopt(argc, argv, "fuin:l:e:t:")) != EOF) {
		if (c == 'n') nbhandles = atoi(optarg);
		else if (c == 'f') fflag++;
		else if (c == 'l') limit = atoi(optarg);
		else if (c == 'e') max_epoll = atoi(optarg);
		else if (c == 't') time_test = atoi(optarg);
		else if (c == 'u') afunix++;
		else if (c == 'i') afinet++;
		else usage(1);
	}
	alloc_streams();
	pthread_create(&tid, NULL, worker_thread_func, (void *)0);
	timer_setup();

	if (fflag) {
		for (fd = 0;;fd = (fd + 1) % nbhandles) {
			if (read(tab[fd].fd[0], buff, 1024) > 0)
				nbhandled++;
			}
		}
	else {
		struct epoll_event *events;
		events = malloc(sizeof(struct epoll_event) * max_epoll) ;
		for (;;) {
			int nb = epoll_wait(epoll_fd, events, max_epoll, -1);
			int i;
			epw_samples++;
			epw_samples_cnt += nb;
			for (i = 0 ; i < nb ; i++) {
				fd = tab[events[i].data.u64].fd[0];
				if (read(fd, buff, 1024) > 0)
					nbhandled++;
				epoll_modify(&events[i], fd, events[i].data.u64);
			}
			if (nb < limit)
				pthread_yield();
		}
	}
}
