/*
 * How to stress epoll
 *
 * This program uses many pipes|sockets and two threads.
 * First we open as many pipes|sockets we can. (see ulimit -n)
 * Then we create a worker thread.
 * The worker thread will send bytes to random streams.
 * The main thread uses epoll to collect ready events and clear them, reading streams.
 * Each second, a number of collected events is printed on stderr
 * After one minute, program prints an average value and stops.
 *
 * Usage : epoll_bench [-f] [-{u|i}] [-n X]
 *   -f : No epoll loop, just feed streams in a cyclic manner
 *   -u : Use AF_UNIX sockets (instead of pipes)
 *   -i : Use AF_INET sockets
 */
#include <pthread.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
# include <netinet/in.h>
#include <fcntl.h>
#include <sys/ioctl.h>

#include <linux/unistd.h>
#include <linux/types.h>
#include <linux/ukevent.h>

#define _syscall4(type,name,type1,arg1,type2,arg2,type3,arg3,type4,arg4) \
type name (type1 arg1, type2 arg2, type3 arg3, type4 arg4) \
{\
	return syscall(__NR_##name, arg1, arg2, arg3, arg4);\
}

#define _syscall5(type,name,type1,arg1,type2,arg2,type3,arg3,type4,arg4, \
	  type5,arg5) \
type name (type1 arg1,type2 arg2,type3 arg3,type4 arg4,type5 arg5) \
{\
	return syscall(__NR_##name, arg1, arg2, arg3, arg4, arg5);\
}

_syscall4(int, kevent_ctl, int, arg1, unsigned int, argv2, unsigned int, argv3, void *, argv4);
_syscall5(int, kevent_get_events, int, arg1, unsigned int, argv2, unsigned int, argv3, __u64, argv4, void *, argv5);

int nbhandles = 1024;
int time_test = 15;
unsigned long nbhandled;
unsigned long epw_samples;
unsigned long epw_samples_cnt;

struct pipefd {
	int fd[2];
} *tab;

int epoll_fd;
int fflag;
int afunix;
int afinet;

static int epoll_test_kevent_add(int fd)
{
	int err;
	struct ukevent ev;
	
	memset(&ev, 0, sizeof(ev));
	if (afinet || afunix)
		ev.type = KEVENT_SOCKET;
	else
		ev.type = KEVENT_PIPE;
	ev.event = KEVENT_SOCKET_RECV;
	ev.req_flags = KEVENT_REQ_ET;
	ev.id.raw[0] = fd;
	err = kevent_ctl(epoll_fd, KEVENT_CTL_ADD, 1, &ev);
	if (err < 0) {
		perror("kevent_ctl: ADD");
		return err;
	} else if (err > 0) {
		printf("Ready immediately: fd: %d, err: %d, ret_flags: %08x, ret_data: %d %d.\n", 
				ev.id.raw[0], err, ev.ret_flags, ev.ret_data[0], ev.ret_data[1]);
	}
}

static int alloc_streams()
{
	int i;
	int listen_sock;
	struct sockaddr_in me, to;
	socklen_t namelen;
    	int on = 1;
	int off = 0;

	if (!fflag) {
		epoll_fd = open("/dev/kevent", O_RDWR);
		if (epoll_fd == -1) {
			perror("open /dev/kevent");
			return -1;
		}
	}
	tab = malloc(sizeof(struct pipefd) * nbhandles);
	if (tab == NULL) {
		perror("malloc");
		return -1;
	}
	if (afinet) {
		listen_sock = socket(AF_INET, SOCK_STREAM, 0);
		if (listen_sock == -1) {
			perror("socket");
			return -1;
		}
		if (listen(listen_sock, 256) == -1) {
			perror("listen");
			return -1;
		}
		namelen = sizeof(me);
		getsockname(listen_sock, (struct sockaddr *)&me, &namelen);
	}
	for (i = 0 ; i < nbhandles ; i++) {
		if (afinet) {
			tab[i].fd[0] = socket(AF_INET, SOCK_STREAM, 0);
			if (tab[i].fd[0] == -1)
				break;
			to = me;
			ioctl(tab[i].fd[0], FIONBIO, &on);
			if (connect(tab[i].fd[0], (struct sockaddr *)&to, sizeof(to)) != -1 || errno != EINPROGRESS)
				break;
			tab[i].fd[1] = accept(listen_sock, (struct sockaddr *)&to, &namelen);
			if (tab[i].fd[1] == -1)
				break;
			ioctl(tab[i].fd[0], FIONBIO, &off);
		}
		else if (afunix) {
			if (socketpair(AF_UNIX, SOCK_STREAM, 0, tab[i].fd) == -1)
				break;
		} else {
			if (pipe(tab[i].fd) == -1)
				break;
		}
		if (!fflag) {
			epoll_test_kevent_add(tab[i].fd[0]);
		}
	}
	nbhandles = i;
	printf("%d handles setup\n", nbhandles);
	return 0;
}

int sample_proc_stat(long *ctxt)
{
	int fd = open("/proc/stat", 0);
	char buffer[4096+1], *p;
	int lu;
	*ctxt = 0;
	if (fd == -1) {
		perror("/proc/stat");
		return -1;
	}
	lu = read(fd, buffer, sizeof(buffer));
	close(fd);
	if (lu < 10)
		return -1;
	buffer[lu] = 0;
	p = strstr(buffer, "ctxt");
	if (p)
		*ctxt = atol(p + 4);
	return 0;
}


static void timer_func()
{
	char buffer[128];
	size_t len;
	static unsigned long old;
	static unsigned long oldctxt=0;
	unsigned long ctxt;
	unsigned long delta = nbhandled - old;
	static int alarm_events = 0;

	old = nbhandled;
	len = sprintf(buffer, "%lu evts/sec", delta);
	sample_proc_stat(&ctxt);
	delta = ctxt - oldctxt;
	if (delta && oldctxt)
		len += sprintf(buffer + len, " %lu ctxt/sec", delta);
	oldctxt = ctxt;
	if (epw_samples)
		len += sprintf(buffer + len, " %g samples per call", (double)epw_samples_cnt/(double)epw_samples);
	buffer[len++] = '\n';
	write(2, buffer, len);
	if (++alarm_events >= time_test) {
		delta = nbhandled/alarm_events;
		len = sprintf(buffer, "Avg: %lu evts/sec\n", delta);
		write(2, buffer, len);
		exit(0);
	}
}

static void timer_setup()
{
	struct itimerval it;
	struct sigaction sg;

	memset(&sg, 0, sizeof(sg));
	sg.sa_handler = timer_func;
	sigaction(SIGALRM, &sg, 0);
	it.it_interval.tv_sec = 1;
	it.it_interval.tv_usec = 0;
	it.it_value.tv_sec = 1;
	it.it_value.tv_usec = 0;
	if (setitimer(ITIMER_REAL, &it, 0))
		perror("setitimer");
}

static void * worker_thread_func(void *arg)
{
	int fd = -1;
	char c = 1;
	int cnt = 0;
	nice(10);
	for (;;) {
		fd = (fd + 1) % nbhandles;
		write(tab[fd].fd[1], &c, 1);
		if (++cnt >= nbhandles) {
			cnt = 0 ;
			pthread_yield(); /* relax :) */
		}
	}
}

void usage(int code)
{
	fprintf(stderr, "Usage : epoll_bench [-n num] [-{u|i}] [-f] [-t duration] [-l limit] [-e maxepoll]\n");
	exit(code);
}

int main(int argc, char *argv[])
{
	char buff[1024];
	pthread_t tid;
	int c, fd;
	int limit = 1000;
	int max_epoll = 1024;

	while ((c = getopt(argc, argv, "fuin:l:e:t:")) != EOF) {
		if (c == 'n') nbhandles = atoi(optarg);
		else if (c == 'f') fflag++;
		else if (c == 'l') limit = atoi(optarg);
		else if (c == 'e') max_epoll = atoi(optarg);
		else if (c == 't') time_test = atoi(optarg);
		else if (c == 'u') afunix++;
		else if (c == 'i') afinet++;
		else usage(1);
	}
	alloc_streams();
	pthread_create(&tid, NULL, worker_thread_func, (void *)0);
	timer_setup();

	if (fflag) {
		for (fd = 0;;fd = (fd + 1) % nbhandles) {
			if (read(tab[fd].fd[0], buff, 1024) > 0)
				nbhandled++;
			}
		}
	else {
		struct ukevent *events;
		events = malloc(sizeof(struct ukevent) * max_epoll);
		for (;;) {
			int nb = kevent_get_events(epoll_fd, 1, max_epoll, 1000000000000ULL, events);
			int i;
			epw_samples++;
			epw_samples_cnt += nb;
			for (i = 0 ; i < nb ; i++) {
				fd = events[i].id.raw[0];
				if (read(fd, buff, 1024) > 0)
					nbhandled++;
			}
			if (nb < limit)
				pthread_yield();
		}
	}
}
