#include <sys/types.h>
#include <sys/socket.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <sys/sendfile.h>
#include <sys/epoll.h>

#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <time.h>
#include <ctype.h>
#include <netdb.h>
#include <sched.h>

#include <linux/types.h>
#include <linux/unistd.h>
#include <linux/kevent.h>

//#define DEBUG

#ifdef DEBUG
#define ulog(f, a...) fprintf(stderr, f, ##a)
#else
#define ulog(f, a...) do {} while (0)
#endif
#define ulog_err(f, a...) ulog(f ": %s [%d].\n", ##a, strerror(errno), errno)

#define MAX_DESC_NUM	100

#define PAGE_SHIFT	12
#define PAGE_SIZE	(0x1 << PAGE_SHIFT)
#define PAGE_MASK	(~(PAGE_SIZE-1))
#define offset_in_page(p)	((unsigned long)(p) & ~PAGE_MASK)

_syscall2(int, kevent_ctl, int, arg1, void *, argv2);

enum {
	TYPE_ASYNC = 0,
	TYPE_SYNC,
	__TYPE_MAX
};

static char client_buf[4096];
static int out_fd, kevent_ctl_fd, need_exit;
static int alarm_timeout=1;
static unsigned long written = 0;
static struct timeval tm1, tm2;
static unsigned long init_utime, init_stime;

static struct epoll_event pfd[MAX_DESC_NUM];
static struct epoll_event pfd_ready[MAX_DESC_NUM];

static char * type_table[] = {
	[TYPE_ASYNC] = "async",
	[TYPE_SYNC] = "sync",
};

static int naio_init(int fd, unsigned int events);

static int cpu_usage(unsigned long *sys_utime, unsigned long *sys_stime)
{
	FILE *f;
	char path[64];
	char *ptr, data[256], status;
	char fmt[] = " %c %d %d %d %d %d %lu %lu %lu %lu %lu %lu %lu ";
	int num, good_num = 13, err;
	int ppid, pgrp, session, tty, tpgid;
	unsigned long flags, minflt, cminflt, majflt, cmajflt, utime, stime;
	int putime, pstime;
	
	snprintf(path, sizeof(path), "/proc/%d/stat", getpid());
	
	f = fopen(path, "r");
	if (!f) {
		ulog_err("Failed to open %s", path);
		return 1;
	}

	err = -EINVAL;
	ptr = fgets(data, sizeof(data), f);
	if (!ptr) {
		ulog_err("Failed to read data");
		goto err_out_close;
	}

	data[sizeof(data) - 1] = '\0';
	
	ptr = strrchr(data, ')');
	if (!ptr) {
		ulog("String '%s' is broken.\n", data);
		goto err_out_close;
	}
	ptr++;
	if (!ptr || !*ptr) {
		ulog("String '%s' is broken.\n", data);
		goto err_out_close;
	}

	num = sscanf(ptr, fmt, &status, &ppid, &pgrp, 
				&session, &tty, &tpgid, &flags, 
				&minflt, &cminflt, &majflt, &cmajflt, 
				&utime, &stime);

	if (num != good_num) {
		ulog("String '%s' is broken, num=%d, good_num=%d.\n", ptr, num, good_num);
		goto err_out_close;
	}

	*sys_utime = utime;
	*sys_stime = stime;

	return 0;

err_out_close:
	fclose(f);
	return -1;
}

static int set_prio(void)
{
	struct sched_param p;
	int err;

	memset(&p, 0, sizeof(p));
	p.sched_priority = 19;
	
	sched_setscheduler(0, SCHED_FIFO, &p);
	err = setpriority(PRIO_PGRP, 0, -20);
	if (err)
		ulog("%s: Failed to change priority.\n", __func__);
}

static void sig_handler(int signo)
{
	need_exit = signo;
}

static void print_stat(void)
{
	unsigned long stime, utime;
	long diff, mdiff;
	double speed;

	stime = utime = 0;

	gettimeofday(&tm2, NULL);
	diff = (tm2.tv_sec - tm1.tv_sec)*1000000 + tm2.tv_usec - tm1.tv_usec;
	mdiff = diff/1000;
	speed = ((double)written)*1000000.0/((double)diff*1024.0*1024.0);

	cpu_usage(&utime, &stime);

	utime -= init_utime;
	stime -= init_stime;

	/*
	 * USER_HZ is 100 tics/jiffies per second.
	 *
	 * ?time * 1000 / 100 is ?time in msecs.
	 */

	utime = utime*1000/mdiff;
	stime = stime*1000/mdiff;

	printf("Written %lu Mb, time %f sec, speed %f Mb/sec, CPU usage user: %3lu, kernel: %3lu.\n", 
			written/1024/1024, ((double)diff)/1000000.0, speed, utime, stime);
	
	init_utime = utime;
	init_stime = stime;
}

static void alarm_handler(int signo)
{
	print_stat();

	alarm(alarm_timeout);
}

static void usage(char *p)
{
	ulog("Usage: %s -a addr -p port -f kevent_path -F file -T type -N client_num\n", p);
}

static int naio_socket_init(char *addr, unsigned short port, int type)
{
	struct hostent *h;
	int s, on;
	struct sockaddr_in sa;

	if (!addr) {
		ulog("%s: Connect address cannot be NULL.\n", __func__);
		return -1;
	}
	
	h = gethostbyname(addr);
	if (!h) {
		ulog_err("%s: Failed to get address of %s.\n", __func__, addr);
		return -1;
	}
	
	s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (s == -1) {
		ulog_err("%s: Failed to create server socket", __func__);
		return -1;
	}

	if (type == TYPE_ASYNC) {
		on = 1;
		setsockopt(s, SOL_SOCKET, SO_ASYNC_SOCK, &on, sizeof(on));
	}

	memcpy(&(sa.sin_addr.s_addr), h->h_addr_list[0], 4);
	sa.sin_port = htons(port);
	sa.sin_family = AF_INET;
		
	if (connect(s, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)) == -1) {
		ulog_err("%s: Failed to connect to %s", __func__, addr);
		close(s);
		return -1;
	}
	
	if (type == TYPE_ASYNC)
		fcntl(s, F_SETFL, O_NONBLOCK);

	return s;
}

static int naio_remove(struct ukevent *event)
{
	struct kevent_user_control *ctl;
	struct ukevent *uk;
	int err;
	char buf[sizeof(struct kevent_user_control) + sizeof(struct ukevent)];

	ctl = (struct kevent_user_control *)buf;
	uk = (struct ukevent *)(ctl + 1);

	ctl->cmd = KEVENT_CTL_REMOVE;
	ctl->num = 1;
	ctl->timeout = 1000;

	memcpy(uk, event, sizeof(struct ukevent));

	err = kevent_ctl(kevent_ctl_fd, buf);
	if (err < 0) {
		ulog_err("Failed to perform control REMOVE operation");
		return err;
	}

	return err;
}

static int naio_send(struct ukevent *e)
{
	int s = e->id.raw[0];
	struct timeval tm;
	int err;
	unsigned int size = sizeof(client_buf) - e->id.raw[1];
	
	gettimeofday(&tm, NULL);

	written += size;

	ulog("%s: written=%u, size=%u.\n", __func__, written, size);
	if (e->ret_flags & KEVENT_RET_BROKEN) {
		ulog("%08lu:%06lu: kevent is broken.\n", tm.tv_sec, tm.tv_usec);
		naio_remove(e);
		close(s);
		return -ENODEV;
	}
#if 0
	err = read(out_fd, client_buf, sizeof(client_buf));
	if (err <= 0) {
		naio_remove(e);
		close(s);
		return -ENODEV;
	}
#endif
	err = naio_init(s, KEVENT_SOCKET_SEND);

	return err;
}

static int naio_init(int fd, unsigned int events)
{
	struct kevent_user_control *ctl;
	struct ukevent *uk;
	int err;
	char buf[sizeof(struct kevent_user_control) + sizeof(struct ukevent)];

	memset(buf, 0, sizeof(buf));

	ctl = (struct kevent_user_control *)buf;
	uk = (struct ukevent *)(ctl + 1);

	ctl->cmd = KEVENT_CTL_ADD;
	ctl->num = 1;
	ctl->timeout = 1000;

	uk->type = KEVENT_NAIO;
	uk->ptr = client_buf;
	uk->req_flags = KEVENT_REQ_ONESHOT;
	
	uk->id.raw[0] = fd;
	uk->id.raw[1] = sizeof(client_buf);

	uk->event = events;

	err = kevent_ctl(kevent_ctl_fd, buf);

	ulog("%s: err=%d, fd=%d.\n", __func__, err, fd);

	if (err < 0) {
		ulog_err("Failed to perform control ADD operation: fd=%d, events=%08x", fd, events);
		return err;
	} else if (ctl->num && (uk->ret_flags & KEVENT_RET_DONE)) {
		err = naio_send(uk);
	}
	return err;
}

static int naio_wait(unsigned int timeout, unsigned int wait_num)
{
	int num, err = 0;
	struct kevent_user_control *ctl;
	struct ukevent *uk;
	struct timeval tm;
	int i;
	char buf[4096];

	ctl = (struct kevent_user_control *)buf;
	uk = (struct ukevent *)(ctl + 1);

	ctl->num = wait_num;
	ctl->timeout = timeout;
	ctl->cmd = KEVENT_CTL_WAIT;

	err = kevent_ctl(kevent_ctl_fd, buf);
	if (err < 0) {
		ulog_err("Failed to perform control operation");
		return err;
	}
	num = ctl->num;

	gettimeofday(&tm, NULL);

	ulog("%08lu.%06lu: Wait: num=%d, ctl->num=%u.\n", tm.tv_sec, tm.tv_usec, num, ctl->num);
	uk = (struct ukevent *)(ctl+1);
	for (i=0; i<num; ++i) {

		ulog("%3u - %3u: id:%08x.%08x, ret_data:%08x.%08x, req_flags:%08x, ret_flags:%08x.\n", 
			i, ctl->num,
			uk[i].id.raw[0], uk[i].id.raw[1],
			uk[i].ret_data[0], uk[i].ret_data[1],
			uk[i].req_flags, uk[i].ret_flags);

			err = naio_send(&uk[i]);
			if (err < 0)
				break;
	}

	return err;
}

static int sync_test(int num)
{
	int err, i;

	for (i=0; i<num; ++i) {
		pfd[i].events = EPOLLOUT;
		err = epoll_ctl(kevent_ctl_fd, EPOLL_CTL_ADD, pfd[i].data.fd, &pfd[i]);
		if (err < 0)
			return err;
	}

	while (!need_exit) {
#if 0
		err = read(out_fd, client_buf, err);
		if (err <= 0)
			break;
#endif
		err = epoll_wait(kevent_ctl_fd, pfd_ready, 1, -1);
		if (err < 0) {
			if (errno == EINTR)
				continue;
			break;
		}
		
		for (i=0; i<err; ++i) {
			err = send(pfd_ready[i].data.fd, client_buf, sizeof(client_buf), 0);
			if (err <= 0)
				break;
			written += err;
			pfd_ready[i].data.fd = 0;
			
			pfd[i].events = EPOLLOUT;
			epoll_ctl(kevent_ctl_fd, EPOLL_CTL_ADD, pfd[i].data.fd, &pfd[i]);
		}
	}

	return err;
}

static int async_test(int num, unsigned int timeout, unsigned int wait_num)
{
	int err, i;

	for (i=0; i<num; ++i) {
		ulog("Initialising fd=%d.\n", pfd[i].data.fd);
		err = naio_init(pfd[i].data.fd, KEVENT_SOCKET_SEND);
		if (err < 0)
			return err;
	}

	while (!need_exit) {
		err = naio_wait(timeout, wait_num);
		if (err < 0)
			break;
	}
	
	return err;
}

int main(int argc, char *argv[])
{
	int ch, s, err, type = TYPE_ASYNC, i, num = 1;
	char *addr, *kevent_path, *out_path;
	short port;
	unsigned int timeout, wait_num;
	double speed;
	long diff;

	kevent_path = out_path = NULL;
	addr = NULL;
	port = -1;
	timeout = 1000;
	wait_num = 1;

	while ((ch = getopt(argc, argv, "N:n:t:f:a:p:F:T:h")) > 0) {
		switch (ch) {
			case 'N':
				num = atoi(optarg);
				break;
			case 't':
				timeout = atoi(optarg);
				break;
			case 'n':
				wait_num = atoi(optarg);
				break;
			case 'f':
				kevent_path = optarg;
				break;
			case 'a':
				addr = optarg;
				break;
			case 'T':
				type = atoi(optarg);
				break;
			case 'p':
				port = atoi(optarg);
				break;
			case 'F':
				out_path = optarg;
				break;
			default:
				usage(argv[0]);
				return -1;
		}
	}

	if ((type == TYPE_ASYNC && !kevent_path) || !addr || port == -1 || !out_path || type >= __TYPE_MAX) {
		ulog("You must specify at least -f, -a, -p and -F parameters.\n");
		usage(argv[0]);
		return -1;
	}
	
	if (type == TYPE_ASYNC) {
		struct kevent_user_control ctl;
		ctl.cmd = KEVENT_CTL_INIT;
		ctl.num = ctl.timeout = 0;
		kevent_ctl_fd = kevent_ctl(0, &ctl);
		if (kevent_ctl_fd < 0) {
			ulog_err("Failed to open kevent control file '%s'", kevent_path);
			return -1;
		}
	} else {
		kevent_ctl_fd = epoll_create(num);
		if (kevent_ctl_fd == -1) {
			ulog_err("Failed to create epoll control descriptor");
			return -1;
		}
	}
#if 0
	out_fd = open(out_path, O_RDWR|O_CREAT|O_TRUNC, 0644);
	if (out_fd == -1) {
		ulog_err("Failed to open output file '%s'", out_path);
		return -1;
	}
#endif
	signal(SIGINT, sig_handler);
	signal(SIGALRM, alarm_handler);
	//set_prio();

	memset(client_buf, 'A', sizeof(client_buf));
	
	printf("Using %s test with %d clients.\n", type_table[type], num);

	gettimeofday(&tm1, NULL);
	
	for (i=0; i<num; ++i) {
		pfd[i].data.fd = naio_socket_init(addr, port, type);
		if (pfd[i].data.fd < 0)
			return -1;
	}

	alarm(alarm_timeout);

	if (type == TYPE_ASYNC)
		async_test(num, timeout, wait_num);
	else
		sync_test(num);

	print_stat();

err_out_exit:
	return 0;
}
