You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1236 lines
31 KiB

Index: src/Makefile
--- src/Makefile.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/Makefile 2017-10-31 00:49:08.314668000 +0100
@@ -2,8 +2,11 @@
CC=gcc
PROFILE?=
OPTIMIZATION?=-O2
-CFLAGS=-std=c99 -Wall $(OPTIMIZATION) $(PROFILE) -g -ggdb -D_GNU_SOURCE -I../deps/jemalloc/include
-LDFLAGS=-pthread -rdynamic $(PROFILE)
+CFLAGS=-std=c99 -Wall $(OPTIMIZATION) $(PROFILE) -D_GNU_SOURCE -I../deps/jemalloc/include
+LDFLAGS=-pthread $(PROFILE)
+OS!=uname -s
+DEPS!=test "$(OS)" = FreeBSD && echo "epoll-shim/libepoll.a"
+LIBS!=test "$(OS)" = FreeBSD && echo "epoll-shim/libepoll.a -lrt"
JEMALLOC=../deps/jemalloc/lib/libjemalloc.a
BIN:=$(NAME)
@@ -13,7 +16,8 @@
%.o: %.c $(JEMALLOC)
$(CC) $(CFLAGS) -c $<
-$(BIN): $(OBJ) $(JEMALLOC)
+$(BIN): $(OBJ) $(JEMALLOC) $(DEPS)
+ $(CC) $(LDFLAGS) -o $(BIN) $(OBJ) $(JEMALLOC) $(LIBS)
$(JEMALLOC):
$(MAKE) jemalloc -C ../deps
@@ -29,3 +33,7 @@
distclean:
$(MAKE) distclean -C ../deps
+
+epoll-shim/libepoll.a:
+ cd epoll-shim && $(MAKE) $(MFLAGS) libepoll.a
+
Index: src/config.c
--- src/config.c.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/config.c 2017-10-31 00:49:08.314890000 +0100
@@ -6,7 +6,11 @@
#include <stdlib.h>
#include <libgen.h>
#include <inttypes.h>
+#if defined(__linux__)
#include <linux/limits.h>
+#else
+#include <sys/limits.h>
+#endif
#include "corvus.h"
#include "alloc.h"
#include "logging.h"
@@ -45,7 +49,8 @@
memset(config.cluster, 0, CLUSTER_NAME_SIZE + 1);
strncpy(config.cluster, "default", CLUSTER_NAME_SIZE);
- config.bind = 12345;
+ strncpy(config.bind.ip, "0.0.0.0", sizeof(config.bind.ip));
+ config.bind.port = 12345;
config.node = cv_calloc(1, sizeof(struct node_conf));
config.node->refcount = 1;
config.thread = DEFAULT_THREAD;
@@ -173,7 +178,7 @@
if (strlen(value) <= 0) return CORVUS_OK;
strncpy(config.cluster, value, CLUSTER_NAME_SIZE);
} else if (strcmp(name, "bind") == 0) {
- if (socket_parse_port(value, &config.bind) == CORVUS_ERR) {
+ if (socket_parse_ip(value, &config.bind) == CORVUS_ERR) {
return CORVUS_ERR;
}
} else if (strcmp(name, "syslog") == 0) {
@@ -294,7 +299,7 @@
if (strcmp(name, "cluster") == 0) {
strncpy(value, config.cluster, max_len);
} else if (strcmp(name, "bind") == 0) {
- snprintf(value, max_len, "%u", config.bind);
+ snprintf(value, max_len, "%s:%u", config.bind.ip, config.bind.port);
} else if (strcmp(name, "node") == 0) {
config_node_to_str(value, max_len);
} else if (strcmp(name, "thread") == 0) {
Index: src/config.h
--- src/config.h.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/config.h 2017-10-31 00:49:08.315004000 +0100
@@ -15,7 +15,7 @@
struct corvus_config {
char config_file_path[CONFIG_FILE_PATH_SIZE + 1];
char cluster[CLUSTER_NAME_SIZE + 1];
- uint16_t bind;
+ struct address bind;
struct node_conf *node;
int thread;
int loglevel;
@@ -47,4 +47,4 @@
int config_add(char *name, char *value);
bool config_option_changable(const char *option);
-#endif /* end of include guard: CONFIG_H */
\ No newline at end of file
+#endif /* end of include guard: CONFIG_H */
Index: src/corvus.c
--- src/corvus.c.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/corvus.c 2017-10-31 00:49:08.315175000 +0100
@@ -30,9 +30,11 @@
}
struct sigaction act;
+#if defined(__linux__)
void *trace[100];
int size = backtrace(trace, 100);
backtrace_symbols_fd(trace, size, STDOUT_FILENO);
+#endif
// restore default signal handlers
sigemptyset(&act.sa_mask);
@@ -276,7 +278,7 @@
exit(EXIT_FAILURE);
}
- if (proxy_init(&ctx->proxy, ctx, "0.0.0.0", config.bind) == -1) {
+ if (proxy_init(&ctx->proxy, ctx, config.bind.ip, config.bind.port) == -1) {
LOG(ERROR, "Fatal: fail to create proxy.");
exit(EXIT_FAILURE);
}
@@ -438,9 +440,11 @@
* https://www.scribd.com/doc/3726406/Crash-N-Burn-Writing-Linux-application-fault-handlers
* https://github.com/gby/libcrash/blob/master/crash.c#L279
*/
+#if defined(__linux__)
void *dummy_trace[1];
backtrace(dummy_trace, 1);
backtrace_symbols_fd(dummy_trace, 0, -1);
+#endif
if ((err = pthread_spin_init(&signal_lock, 0)) != 0) {
LOG(ERROR, "Fail to init spin lock: %s", strerror(err));
return EXIT_FAILURE;
@@ -503,7 +507,7 @@
stats_init();
}
- LOG(INFO, "serve at 0.0.0.0:%d", config.bind);
+ LOG(INFO, "serve at %s:%d", config.bind.ip, config.bind.port);
for (i = 0; i < config.thread; i++) {
if ((err = pthread_join(contexts[i].thread, NULL)) != 0) {
Index: src/epoll-shim/Makefile
--- src/epoll-shim/Makefile.orig 2017-10-31 00:49:08.315315000 +0100
+++ src/epoll-shim/Makefile 2017-10-31 00:49:08.315300000 +0100
@@ -0,0 +1,18 @@
+
+CC = cc
+AR = ar
+RANLIB = ranlib
+
+all: libepoll.a
+
+.c.o:
+ $(CC) -Iinclude -o $@ -c $<
+
+libepoll.a: src/epoll.o src/timerfd.o src/signalfd.o src/common.o
+ $(AR) rc libepoll.a src/epoll.o src/timerfd.o src/signalfd.o src/common.o
+ $(RANLIB) libepoll.a
+
+clean:
+ -rm -f libepoll.a
+ -rm -f src/epoll.o src/timerfd.o src/signalfd.o src/common.o
+
Index: src/epoll-shim/README
--- src/epoll-shim/README.orig 2017-10-31 00:49:08.315402000 +0100
+++ src/epoll-shim/README 2017-10-31 00:49:08.315389000 +0100
@@ -0,0 +1 @@
+https://github.com/jiixyj/epoll-shim
Index: src/epoll-shim/include/sys/epoll.h
--- src/epoll-shim/include/sys/epoll.h.orig 2017-10-31 00:49:08.315542000 +0100
+++ src/epoll-shim/include/sys/epoll.h 2017-10-31 00:49:08.315528000 +0100
@@ -0,0 +1,70 @@
+#ifndef SHIM_SYS_EPOLL_H
+#define SHIM_SYS_EPOLL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <sys/types.h>
+#include <fcntl.h>
+
+#if 0
+#define __NEED_sigset_t
+
+#include <bits/alltypes.h>
+#endif
+
+#define EPOLL_CLOEXEC O_CLOEXEC
+#define EPOLL_NONBLOCK O_NONBLOCK
+
+enum EPOLL_EVENTS { __EPOLL_DUMMY };
+#define EPOLLIN 0x001
+#define EPOLLPRI 0x002
+#define EPOLLOUT 0x004
+#define EPOLLRDNORM 0x040
+#define EPOLLRDBAND 0x080
+#define EPOLLWRNORM 0x100
+#define EPOLLWRBAND 0x200
+#define EPOLLMSG 0x400
+#define EPOLLERR 0x008
+#define EPOLLHUP 0x010
+#define EPOLLRDHUP 0x2000
+#define EPOLLWAKEUP (1U<<29)
+#define EPOLLONESHOT (1U<<30)
+#define EPOLLET (1U<<31)
+
+#define EPOLL_CTL_ADD 1
+#define EPOLL_CTL_DEL 2
+#define EPOLL_CTL_MOD 3
+
+typedef union epoll_data {
+ void *ptr;
+ int fd;
+ uint32_t u32;
+ uint64_t u64;
+} epoll_data_t;
+
+struct epoll_event {
+ uint32_t events;
+ epoll_data_t data;
+}
+#if defined(__amd64__)
+__attribute__((packed))
+#endif
+;
+
+int epoll_create(int);
+int epoll_create1(int);
+int epoll_ctl(int, int, int, struct epoll_event *);
+int epoll_wait(int, struct epoll_event *, int, int);
+#if 0
+int epoll_pwait(int, struct epoll_event *, int, int, const sigset_t *);
+#endif
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* sys/epoll.h */
Index: src/epoll-shim/include/sys/signalfd.h
--- src/epoll-shim/include/sys/signalfd.h.orig 2017-10-31 00:49:08.315644000 +0100
+++ src/epoll-shim/include/sys/signalfd.h 2017-10-31 00:49:08.315631000 +0100
@@ -0,0 +1,55 @@
+#ifndef SHIM_SYS_SIGNALFD_H
+#define SHIM_SYS_SIGNALFD_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <sys/types.h>
+
+#include <stdint.h>
+#include <fcntl.h>
+#include <signal.h>
+
+#if 0
+#define __NEED_sigset_t
+
+#include <bits/alltypes.h>
+#endif
+
+#define SFD_CLOEXEC O_CLOEXEC
+#define SFD_NONBLOCK O_NONBLOCK
+
+int signalfd(int, const sigset_t *, int);
+
+struct signalfd_siginfo {
+ uint32_t ssi_signo;
+ int32_t ssi_errno;
+ int32_t ssi_code;
+ uint32_t ssi_pid;
+ uint32_t ssi_uid;
+ int32_t ssi_fd;
+ uint32_t ssi_tid;
+ uint32_t ssi_band;
+ uint32_t ssi_overrun;
+ uint32_t ssi_trapno;
+ int32_t ssi_status;
+ int32_t ssi_int;
+ uint64_t ssi_ptr;
+ uint64_t ssi_utime;
+ uint64_t ssi_stime;
+ uint64_t ssi_addr;
+ uint16_t ssi_addr_lsb;
+ uint8_t pad[128-12*4-4*8-2];
+};
+
+extern int epoll_shim_close(int fd);
+extern ssize_t epoll_shim_read(int fd, void *buf, size_t nbytes);
+#define read epoll_shim_read
+#define close epoll_shim_close
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
Index: src/epoll-shim/include/sys/timerfd.h
--- src/epoll-shim/include/sys/timerfd.h.orig 2017-10-31 00:49:08.315738000 +0100
+++ src/epoll-shim/include/sys/timerfd.h 2017-10-31 00:49:08.315725000 +0100
@@ -0,0 +1,34 @@
+#ifndef SHIM_SYS_TIMERFD_H
+#define SHIM_SYS_TIMERFD_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <time.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#define TFD_NONBLOCK O_NONBLOCK
+#define TFD_CLOEXEC O_CLOEXEC
+
+#define TFD_TIMER_ABSTIME 1
+
+struct itimerspec;
+
+int timerfd_create(int, int);
+int timerfd_settime(int, int, const struct itimerspec *, struct itimerspec *);
+#if 0
+int timerfd_gettime(int, struct itimerspec *);
+#endif
+
+extern int epoll_shim_close(int fd);
+extern ssize_t epoll_shim_read(int fd, void *buf, size_t nbytes);
+#define read epoll_shim_read
+#define close epoll_shim_close
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
Index: src/epoll-shim/src/common.c
--- src/epoll-shim/src/common.c.orig 2017-10-31 00:49:08.315856000 +0100
+++ src/epoll-shim/src/common.c 2017-10-31 00:49:08.315843000 +0100
@@ -0,0 +1,50 @@
+#include <sys/param.h>
+#include <sys/types.h>
+
+#include <pthread.h>
+#include <stdbool.h>
+#include <unistd.h>
+
+extern pthread_mutex_t timerfd_context_mtx;
+extern struct timerfd_context *get_timerfd_context(int fd, bool create_new);
+extern ssize_t timerfd_read(
+ struct timerfd_context *, void *buf, size_t nbytes);
+extern int timerfd_close(struct timerfd_context *);
+
+extern pthread_mutex_t signalfd_context_mtx;
+extern struct signalfd_context *get_signalfd_context(int fd, bool create_new);
+extern ssize_t signalfd_read(
+ struct signalfd_context *, void *buf, size_t nbytes);
+extern int signalfd_close(struct signalfd_context *);
+
+#define WRAP(context, return_type, call, unlock_after_call) \
+ if (fd >= 0) { \
+ pthread_mutex_lock(&context##_mtx); \
+ struct context *ctx = get_##context(fd, false); \
+ if (ctx) { \
+ return_type ret = (call); \
+ if (unlock_after_call) { \
+ pthread_mutex_unlock(&context##_mtx); \
+ } \
+ return ret; \
+ } \
+ pthread_mutex_unlock(&context##_mtx); \
+ }
+
+int
+epoll_shim_close(int fd)
+{
+ WRAP(timerfd_context, int, timerfd_close(ctx), true)
+ WRAP(signalfd_context, int, signalfd_close(ctx), true)
+
+ return close(fd);
+}
+
+ssize_t
+epoll_shim_read(int fd, void *buf, size_t nbytes)
+{
+ WRAP(timerfd_context, ssize_t, timerfd_read(ctx, buf, nbytes), false)
+ WRAP(signalfd_context, ssize_t, signalfd_read(ctx, buf, nbytes), false)
+
+ return read(fd, buf, nbytes);
+}
Index: src/epoll-shim/src/epoll.c
--- src/epoll-shim/src/epoll.c.orig 2017-10-31 00:49:08.315986000 +0100
+++ src/epoll-shim/src/epoll.c 2017-10-31 00:49:08.315974000 +0100
@@ -0,0 +1,284 @@
+#include <sys/epoll.h>
+
+#include <sys/types.h>
+
+#include <sys/event.h>
+#include <sys/param.h>
+#include <sys/queue.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+
+#include <errno.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+int
+epoll_create(int size __unused)
+{
+ fprintf(stderr,
+ "ERROR: epoll_create() is deprecated, use "
+ "epoll_create1(EPOLL_CLOEXEC).\n");
+ errno = EINVAL;
+ return -1;
+}
+
+int
+epoll_create1(int flags)
+{
+ if (flags != EPOLL_CLOEXEC) {
+ fprintf(stderr, "ERROR: Use epoll_create1(EPOLL_CLOEXEC).\n");
+ errno = EINVAL;
+ return -1;
+ }
+ return kqueue();
+}
+
+static int poll_fd = -1;
+static int poll_epoll_fd = -1;
+static void *poll_ptr;
+
+int
+epoll_ctl(int fd, int op, int fd2, struct epoll_event *ev)
+{
+ if ((!ev && op != EPOLL_CTL_DEL) ||
+ (ev &&
+ ((ev->events &
+ ~(EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLRDHUP | EPOLLERR))
+ /* the user should really set one of EPOLLIN or EPOLLOUT
+ * so that EPOLLHUP and EPOLLERR work. Don't make this a
+ * hard error for now, though. */
+ /* || !(ev->events & (EPOLLIN | EPOLLOUT)) */))) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ struct kevent kev[4];
+
+ if (op == EPOLL_CTL_ADD) {
+ /* Check if the fd already has been registered in this kqueue.
+ * See below for an explanation of this 'cookie' mechanism. */
+ EV_SET(&kev[0], fd2 * 2, EVFILT_USER, 0, 0, 0, 0);
+ if (!(kevent(fd, kev, 1, NULL, 0, NULL) < 0 &&
+ errno == ENOENT)) {
+ errno = EEXIST;
+ return -1;
+ }
+
+ EV_SET(&kev[0], fd2, EVFILT_READ,
+ EV_ADD | (ev->events & EPOLLIN ? 0 : EV_DISABLE), 0, 0,
+ ev->data.ptr);
+ EV_SET(&kev[1], fd2, EVFILT_WRITE,
+ EV_ADD | (ev->events & EPOLLOUT ? 0 : EV_DISABLE), 0, 0,
+ ev->data.ptr);
+ /* We save a 'cookie' knote inside the kq to signal if the fd
+ * has been 'registered'. We need this because there is no way
+ * to ask a kqueue if a knote has been registered without
+ * modifying the udata. */
+ EV_SET(&kev[2], fd2 * 2, EVFILT_USER, EV_ADD, 0, 0, 0);
+ EV_SET(&kev[3], fd2 * 2 + 1, EVFILT_USER,
+ (ev->events & EPOLLRDHUP) ? EV_ADD : 0, 0, 0, 0);
+ } else if (op == EPOLL_CTL_DEL) {
+ if (poll_fd == fd2 && fd == poll_epoll_fd) {
+ poll_fd = -1;
+ poll_epoll_fd = -1;
+ poll_ptr = NULL;
+ return 0;
+ }
+
+ EV_SET(&kev[0], fd2, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ EV_SET(&kev[1], fd2, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ EV_SET(&kev[2], fd2 * 2, EVFILT_USER, EV_DELETE, 0, 0, 0);
+ EV_SET(&kev[3], fd2 * 2 + 1, EVFILT_USER, EV_DELETE, 0, 0, 0);
+ } else if (op == EPOLL_CTL_MOD) {
+ EV_SET(&kev[0], fd2, EVFILT_READ,
+ ev->events & EPOLLIN ? EV_ENABLE : EV_DISABLE, 0, 0,
+ ev->data.ptr);
+ EV_SET(&kev[1], fd2, EVFILT_WRITE,
+ ev->events & EPOLLOUT ? EV_ENABLE : EV_DISABLE, 0, 0,
+ ev->data.ptr);
+ EV_SET(&kev[2], fd2 * 2, EVFILT_USER, 0, 0, 0, 0);
+ EV_SET(&kev[3], fd2 * 2 + 1, EVFILT_USER,
+ (ev->events & EPOLLRDHUP) ? EV_ADD : EV_DELETE, 0, 0, 0);
+ } else {
+ errno = EINVAL;
+ return -1;
+ }
+
+ for (int i = 0; i < 4; ++i) {
+ kev[i].flags |= EV_RECEIPT;
+ }
+
+ int ret = kevent(fd, kev, 4, kev, 4, NULL);
+ if (ret < 0) {
+ return -1;
+ }
+
+ if (ret != 4) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ for (int i = 0; i < 4; ++i) {
+ if (kev[i].flags != EV_ERROR) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (kev[i].data == ENODEV && poll_fd < 0) {
+ poll_fd = fd2;
+ poll_epoll_fd = fd;
+ poll_ptr = ev->data.ptr;
+ return 0;
+ }
+
+ /* ignore EVFILT_WRITE registration EINVAL errors (some fd
+ * types such as kqueues themselves don't support it) */
+ if (i == 1 && kev[i].data == EINVAL) {
+ continue;
+ }
+
+ /* ignore EVFILT_USER registration ENOENT errors (used for
+ * EPOLLRDHUP cookie fd and may therefore not exist) */
+ if (i == 3 && kev[i].data == ENOENT) {
+ continue;
+ }
+
+ if (kev[i].data != 0) {
+ errno = kev[i].data;
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+#if 0
+int
+epoll_pwait(
+ int fd, struct epoll_event *ev, int cnt, int to, const sigset_t *sigs)
+{
+ int r = __syscall(SYS_epoll_pwait, fd, ev, cnt, to, sigs, _NSIG / 8);
+#ifdef SYS_epoll_wait
+ if (r == -ENOSYS && !sigs)
+ r = __syscall(SYS_epoll_wait, fd, ev, cnt, to);
+#endif
+ return __syscall_ret(r);
+}
+#endif
+
+int
+epoll_wait(int fd, struct epoll_event *ev, int cnt, int to)
+{
+ if (cnt < 1 || cnt > 32) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (poll_fd != -1 && fd == poll_epoll_fd) {
+ struct pollfd pfds[2];
+ pfds[0].fd = poll_fd;
+ pfds[0].events = POLLIN;
+ pfds[1].fd = fd;
+ pfds[1].events = POLLIN;
+ int ret = poll(pfds, 2, to);
+ if (ret <= 0) {
+ return ret;
+ }
+ if (pfds[0].revents & POLLIN) {
+ ev[0].events = EPOLLIN;
+ ev[0].data.ptr = poll_ptr;
+ return 1;
+ }
+ to = 0;
+ }
+
+ struct kevent evlist[32];
+
+ struct timespec timeout = {0, 0};
+ if (to > 0) {
+ timeout.tv_sec = to / 1000;
+ timeout.tv_nsec = (to % 1000) * 1000 * 1000;
+ }
+
+ struct timespec *ptimeout = NULL;
+ if (to >= 0) {
+ ptimeout = &timeout;
+ }
+
+ int ret = kevent(fd, NULL, 0, evlist, cnt, ptimeout);
+ if (ret < 0) {
+ return -1;
+ }
+
+ for (int i = 0; i < ret; ++i) {
+ int events = 0;
+ if (evlist[i].filter == EVFILT_READ) {
+ events |= EPOLLIN;
+ } else if (evlist[i].filter == EVFILT_WRITE) {
+ events |= EPOLLOUT;
+ }
+
+ if (evlist[i].flags & EV_ERROR) {
+ events |= EPOLLERR;
+ }
+
+ if (evlist[i].flags & EV_EOF) {
+ // fprintf(stderr, "got fflags: %d\n",
+ // evlist[i].fflags);
+ if (evlist[i].fflags) {
+ events |= EPOLLERR;
+ }
+
+ int epoll_event = EPOLLHUP;
+
+ struct stat statbuf;
+ if (fstat(evlist[i].ident, &statbuf) < 0) {
+ return -1;
+ }
+
+ if (S_ISFIFO(statbuf.st_mode)) {
+ if (evlist[i].filter == EVFILT_READ &&
+ evlist[i].data == 0) {
+ events &= ~EPOLLIN;
+ } else if (evlist[i].filter == EVFILT_WRITE) {
+ epoll_event = EPOLLERR;
+ }
+ } else if (S_ISSOCK(statbuf.st_mode) &&
+ evlist[i].filter == EVFILT_READ) {
+ /* do some special EPOLLRDHUP handling for
+ * sockets */
+ struct kevent kev[1];
+ /* if we are reading, we just know for
+ * sure that we can't receive any more,
+ * so use EPOLLIN/EPOLLRDHUP per
+ * default */
+ epoll_event = EPOLLIN;
+ EV_SET(&kev[0], evlist[i].ident * 2 + 1,
+ EVFILT_USER, 0, 0, 0, 0);
+ int old_errno = errno;
+ if (!(kevent(fd, kev, 1, NULL, 0, NULL) ==
+ -1 &&
+ errno == ENOENT)) {
+ epoll_event |= EPOLLRDHUP;
+ }
+ errno = old_errno;
+
+ /* only set EPOLLHUP if the stat says
+ * that writing is also impossible */
+ if (!(statbuf.st_mode &
+ (S_IWUSR | S_IWGRP | S_IWOTH))) {
+ epoll_event |= EPOLLHUP;
+ }
+ }
+
+ events |= epoll_event;
+ }
+ ev[i].events = events;
+ ev[i].data.ptr = evlist[i].udata;
+ }
+ return ret;
+}
Index: src/epoll-shim/src/signalfd.c
--- src/epoll-shim/src/signalfd.c.orig 2017-10-31 00:49:08.316086000 +0100
+++ src/epoll-shim/src/signalfd.c 2017-10-31 00:49:08.316074000 +0100
@@ -0,0 +1,143 @@
+#include <sys/signalfd.h>
+#undef read
+#undef close
+
+#include <sys/types.h>
+
+#include <sys/event.h>
+#include <sys/param.h>
+#include <sys/time.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+struct signalfd_context {
+ int fd;
+ int flags;
+ struct signalfd_context *next;
+};
+
+static struct signalfd_context *signalfd_contexts;
+pthread_mutex_t signalfd_context_mtx = PTHREAD_MUTEX_INITIALIZER;
+
+struct signalfd_context *
+get_signalfd_context(int fd, bool create_new)
+{
+ for (struct signalfd_context *ctx = signalfd_contexts; ctx;
+ ctx = ctx->next) {
+ if (fd == ctx->fd) {
+ return ctx;
+ }
+ }
+
+ if (create_new) {
+ struct signalfd_context *new_ctx =
+ calloc(1, sizeof(struct signalfd_context));
+ if (!new_ctx) {
+ return NULL;
+ }
+ new_ctx->fd = -1;
+ new_ctx->next = signalfd_contexts;
+ signalfd_contexts = new_ctx;
+ return new_ctx;
+ }
+
+ return NULL;
+}
+
+static int
+signalfd_impl(int fd, const sigset_t *sigs, int flags)
+{
+ if (fd != -1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (flags & ~(SFD_NONBLOCK | SFD_CLOEXEC)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ struct signalfd_context *ctx = get_signalfd_context(-1, true);
+ if (!ctx) {
+ errno = EMFILE;
+ return -1;
+ }
+
+ ctx->fd = kqueue();
+ if (ctx->fd == -1) {
+ return -1;
+ }
+
+ ctx->flags = flags;
+
+ struct kevent kevs[_SIG_MAXSIG];
+ int n = 0;
+
+ for (int i = 1; i <= _SIG_MAXSIG; ++i) {
+ if (sigismember(sigs, i)) {
+ EV_SET(&kevs[n++], i, EVFILT_SIGNAL, EV_ADD, 0, 0, 0);
+ }
+ }
+
+ int ret = kevent(ctx->fd, kevs, n, NULL, 0, NULL);
+ if (ret == -1) {
+ close(ctx->fd);
+ ctx->fd = -1;
+ return -1;
+ }
+
+ return ctx->fd;
+}
+
+int
+signalfd(int fd, const sigset_t *sigs, int flags)
+{
+ pthread_mutex_lock(&signalfd_context_mtx);
+ int ret = signalfd_impl(fd, sigs, flags);
+ pthread_mutex_unlock(&signalfd_context_mtx);
+ return ret;
+}
+
+ssize_t
+signalfd_read(struct signalfd_context *ctx, void *buf, size_t nbytes)
+{
+ int fd = ctx->fd;
+ int flags = ctx->flags;
+ pthread_mutex_unlock(&signalfd_context_mtx);
+
+ // TODO: fix this to read multiple signals
+ if (nbytes != sizeof(struct signalfd_siginfo)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ struct timespec timeout = {0, 0};
+ struct kevent kev;
+ int ret = kevent(
+ fd, NULL, 0, &kev, 1, (flags & SFD_NONBLOCK) ? &timeout : NULL);
+ if (ret == -1) {
+ return -1;
+ } else if (ret == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ memset(buf, '\0', nbytes);
+ struct signalfd_siginfo *sig_buf = buf;
+ sig_buf->ssi_signo = (uint32_t)kev.ident;
+ return (ssize_t)nbytes;
+}
+
+int
+signalfd_close(struct signalfd_context *ctx)
+{
+ int ret = close(ctx->fd);
+ ctx->fd = -1;
+ return ret;
+}
Index: src/epoll-shim/src/timerfd.c
--- src/epoll-shim/src/timerfd.c.orig 2017-10-31 00:49:08.316198000 +0100
+++ src/epoll-shim/src/timerfd.c 2017-10-31 00:49:08.316185000 +0100
@@ -0,0 +1,232 @@
+#include <sys/timerfd.h>
+#undef read
+#undef close
+
+#include <sys/types.h>
+
+#include <sys/event.h>
+#include <sys/param.h>
+#include <sys/time.h>
+
+#include <pthread.h>
+#include <pthread_np.h>
+
+#include <errno.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct timerfd_context {
+ int fd;
+ pthread_t worker;
+ timer_t timer;
+ int flags;
+ struct timerfd_context *next;
+};
+
+static struct timerfd_context *timerfd_contexts;
+pthread_mutex_t timerfd_context_mtx = PTHREAD_MUTEX_INITIALIZER;
+
+struct timerfd_context *
+get_timerfd_context(int fd, bool create_new)
+{
+ for (struct timerfd_context *ctx = timerfd_contexts; ctx;
+ ctx = ctx->next) {
+ if (fd == ctx->fd) {
+ return ctx;
+ }
+ }
+
+ if (create_new) {
+ struct timerfd_context *new_ctx =
+ calloc(1, sizeof(struct timerfd_context));
+ if (!new_ctx) {
+ return NULL;
+ }
+ new_ctx->fd = -1;
+ new_ctx->next = timerfd_contexts;
+ timerfd_contexts = new_ctx;
+ return new_ctx;
+ }
+
+ return NULL;
+}
+
+static void *
+worker_function(void *arg)
+{
+ struct timerfd_context *ctx = arg;
+
+ siginfo_t info;
+ sigset_t set;
+ sigemptyset(&set);
+ sigaddset(&set, SIGRTMIN);
+ sigaddset(&set, SIGRTMIN + 1);
+ (void)pthread_sigmask(SIG_BLOCK, &set, NULL);
+
+ struct kevent kev;
+ EV_SET(&kev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0,
+ (void *)(intptr_t)pthread_getthreadid_np());
+ (void)kevent(ctx->fd, &kev, 1, NULL, 0, NULL);
+
+ for (;;) {
+ if (sigwaitinfo(&set, &info) != SIGRTMIN) {
+ break;
+ }
+ EV_SET(&kev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0,
+ (void *)(intptr_t)timer_getoverrun(ctx->timer));
+ (void)kevent(ctx->fd, &kev, 1, NULL, 0, NULL);
+ }
+
+ return NULL;
+}
+
+static int
+timerfd_create_impl(int clockid, int flags)
+{
+ if (clockid != CLOCK_MONOTONIC && clockid != CLOCK_REALTIME) {
+ return EINVAL;
+ }
+
+ if (flags & ~(TFD_CLOEXEC | TFD_NONBLOCK)) {
+ return EINVAL;
+ }
+
+ struct timerfd_context *ctx = get_timerfd_context(-1, true);
+ if (!ctx) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ ctx->fd = kqueue();
+ if (ctx->fd == -1) {
+ return -1;
+ }
+
+ ctx->flags = flags;
+
+ struct kevent kev;
+ EV_SET(&kev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, 0);
+ if (kevent(ctx->fd, &kev, 1, NULL, 0, NULL) == -1) {
+ close(ctx->fd);
+ ctx->fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&ctx->worker, NULL, worker_function, ctx) == -1) {
+ close(ctx->fd);
+ ctx->fd = -1;
+ return -1;
+ }
+
+ int ret = kevent(ctx->fd, NULL, 0, &kev, 1, NULL);
+ if (ret == -1) {
+ pthread_kill(ctx->worker, SIGRTMIN + 1);
+ pthread_join(ctx->worker, NULL);
+ close(ctx->fd);
+ ctx->fd = -1;
+ return -1;
+ }
+
+ int tid = (int)(intptr_t)kev.udata;
+
+ struct sigevent sigev = {.sigev_notify = SIGEV_THREAD_ID,
+ .sigev_signo = SIGRTMIN,
+ .sigev_notify_thread_id = tid};
+
+ if (timer_create(clockid, &sigev, &ctx->timer) == -1) {
+ pthread_kill(ctx->worker, SIGRTMIN + 1);
+ pthread_join(ctx->worker, NULL);
+ close(ctx->fd);
+ ctx->fd = -1;
+ return -1;
+ }
+
+ return ctx->fd;
+}
+
+int
+timerfd_create(int clockid, int flags)
+{
+ pthread_mutex_lock(&timerfd_context_mtx);
+ int ret = timerfd_create_impl(clockid, flags);
+ pthread_mutex_unlock(&timerfd_context_mtx);
+ return ret;
+}
+
+static int
+timerfd_settime_impl(
+ int fd, int flags, const struct itimerspec *new, struct itimerspec *old)
+{
+ struct timerfd_context *ctx = get_timerfd_context(fd, false);
+ if (!ctx) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (flags & ~(TFD_TIMER_ABSTIME)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ return timer_settime(ctx->timer,
+ (flags & TFD_TIMER_ABSTIME) ? TIMER_ABSTIME : 0, new, old);
+}
+
+int
+timerfd_settime(
+ int fd, int flags, const struct itimerspec *new, struct itimerspec *old)
+{
+ pthread_mutex_lock(&timerfd_context_mtx);
+ int ret = timerfd_settime_impl(fd, flags, new, old);
+ pthread_mutex_unlock(&timerfd_context_mtx);
+ return ret;
+}
+
+#if 0
+int timerfd_gettime(int fd, struct itimerspec *cur)
+{
+ return syscall(SYS_timerfd_gettime, fd, cur);
+}
+#endif
+
+ssize_t
+timerfd_read(struct timerfd_context *ctx, void *buf, size_t nbytes)
+{
+ int fd = ctx->fd;
+ int flags = ctx->flags;
+ pthread_mutex_unlock(&timerfd_context_mtx);
+
+ if (nbytes < sizeof(uint64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ struct timespec timeout = {0, 0};
+ struct kevent kev;
+ int ret = kevent(
+ fd, NULL, 0, &kev, 1, (flags & TFD_NONBLOCK) ? &timeout : NULL);
+ if (ret == -1) {
+ return -1;
+ } else if (ret == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ uint64_t nr_expired = 1 + (uint64_t)kev.udata;
+ memcpy(buf, &nr_expired, sizeof(uint64_t));
+
+ return sizeof(uint64_t);
+}
+
+int
+timerfd_close(struct timerfd_context *ctx)
+{
+ timer_delete(ctx->timer);
+ pthread_kill(ctx->worker, SIGRTMIN + 1);
+ pthread_join(ctx->worker, NULL);
+ int ret = close(ctx->fd);
+ ctx->fd = -1;
+ return ret;
+}
Index: src/event.c
--- src/event.c.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/event.c 2017-10-31 00:53:52.719983000 +0100
@@ -16,7 +16,11 @@
assert(nevent > 0);
+#if defined(__linux__)
epfd = epoll_create(nevent);
+#elif defined(__FreeBSD__)
+ epfd = epoll_create1(EPOLL_CLOEXEC);
+#endif
if (epfd < 0) {
return -1;
}
@@ -48,7 +52,11 @@
struct epoll_event event;
event.data.ptr = c;
+#if defined(__linux__)
event.events = EPOLLET;
+#elif defined(__FreeBSD__)
+ event.events = 0;
+#endif
if (mask & E_WRITABLE) event.events |= EPOLLOUT;
if (mask & E_READABLE) event.events |= EPOLLIN;
@@ -67,7 +75,11 @@
int op = mask == E_NONE ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
event.data.ptr = c;
+#if defined(__linux__)
event.events = EPOLLET;
+#elif defined(__FreeBSD__)
+ event.events = 0;
+#endif
if (mask & E_WRITABLE) event.events |= EPOLLOUT;
if (mask & E_READABLE) event.events |= EPOLLIN;
Index: src/event.h
--- src/event.h.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/event.h 2017-10-31 00:49:08.316395000 +0100
@@ -1,7 +1,11 @@
#ifndef EVENT_H
#define EVENT_H
+#if defined(__linux__)
#include <sys/epoll.h>
+#elif defined(__FreeBSD__)
+#include "epoll-shim/include/sys/epoll.h"
+#endif
#include "connection.h"
#define E_NONE 0
Index: src/logging.c
--- src/logging.c.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/logging.c 2017-10-31 00:49:08.316511000 +0100
@@ -7,6 +7,9 @@
#include <time.h>
#include "corvus.h"
#include "logging.h"
+#if defined(__FreeBSD__)
+#include <sys/thr.h>
+#endif
static const char *LEVEL_MAP[] = {"DEBUG", "INFO", "WARN", "ERROR", "CRIT"};
static const int SYSLOG_LEVEL_MAP[] = {LOG_DEBUG, LOG_INFO, LOG_WARNING, LOG_ERR, LOG_CRIT};
@@ -20,7 +23,15 @@
if (level < ATOMIC_GET(config.loglevel)) return;
+#if defined(__FreeBSD__)
+ long lwpid;
+ thr_self(&lwpid);
+ pid_t thread_id = (pid_t)lwpid;
+#elif defined(__linux__)
pid_t thread_id = (pid_t)syscall(SYS_gettid);
+#else
+ pid_t thread_id = (pid_t)0
+#endif
pid_t process_id = getpid();
va_start(ap, fmt);
@@ -28,14 +39,14 @@
va_end(ap);
if (config.syslog) {
- syslog(SYSLOG_LEVEL_MAP[level], "[%s %d %d %d] %s", config.cluster,
- (int)config.bind, (int)process_id, (int)thread_id, msg);
+ syslog(SYSLOG_LEVEL_MAP[level], "[%s %s:%d %d %d] %s", config.cluster,
+ config.bind.ip, (int)config.bind.port, (int)process_id, (int)thread_id, msg);
} else {
gettimeofday(&now, NULL);
int n = strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S,",
localtime(&now.tv_sec));
snprintf(timestamp + n, sizeof(timestamp) - n, "%03d", (int)now.tv_usec/1000);
- fprintf(stderr, "%s %s [%s %d %d %d]: %s (%s:%d)\n", timestamp, LEVEL_MAP[level],
- config.cluster, (int)config.bind, (int)process_id, (int)thread_id, msg, file, line);
+ fprintf(stderr, "%s %s [%s %s:%d %d %d]: %s (%s:%d)\n", timestamp, LEVEL_MAP[level],
+ config.cluster, config.bind.ip, (int)config.bind.port, (int)process_id, (int)thread_id, msg, file, line);
}
}
Index: src/socket.c
--- src/socket.c.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/socket.c 2017-10-31 00:49:08.316651000 +0100
@@ -2,7 +2,13 @@
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <sys/uio.h>
+#if defined(__linux__)
#include <sys/eventfd.h>
+#elif defined(__FreeBSD__)
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#endif
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
@@ -374,10 +380,25 @@
int socket_create_eventfd()
{
+#if defined(__linux__)
int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (fd == -1) {
LOG(WARN, "%s: %s", __func__, strerror(errno));
}
+#elif defined(__FreeBSD__)
+ int fd = kqueue();
+ if (fd == -1)
+ LOG(WARN, "%s: kqueue: %s", __func__, strerror(errno));
+ else {
+ struct kevent change;
+ EV_SET(&change, 123456, EVFILT_USER, EV_ADD, 0, 0, NULL);
+ if (kevent(fd, &change, 1, NULL, 0, NULL) < 0) {
+ LOG(WARN, "%s: kevent: %s", __func__, strerror(errno));
+ close(fd);
+ fd = -1;
+ }
+ }
+#endif
return fd;
}
Index: src/socket.h
--- src/socket.h.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/socket.h 2017-10-31 00:49:08.316757000 +0100
@@ -1,6 +1,8 @@
#ifndef SOCKET_H
#define SOCKET_H
+#include <sys/types.h>
+#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h>
#include "mbuf.h"
Index: src/timer.c
--- src/timer.c.orig 2017-09-11 08:34:23.000000000 +0200
+++ src/timer.c 2017-10-31 00:49:08.316864000 +0100
@@ -1,4 +1,10 @@
+
+#if defined(__linux__)
#include <sys/timerfd.h>
+#endif
+#if defined(__FreeBSD__)
+#include "epoll-shim/include/sys/timerfd.h"
+#endif
#include <string.h>
#include <errno.h>
#include <time.h>