summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/aio/aio_cancel.c16
-rw-r--r--src/aio/aio_error.c6
-rw-r--r--src/aio/aio_fsync.c9
-rw-r--r--src/aio/aio_readwrite.c104
-rw-r--r--src/aio/aio_return.c6
-rw-r--r--src/aio/aio_suspend.c57
-rw-r--r--src/aio/lio_listio.c140
7 files changed, 338 insertions, 0 deletions
diff --git a/src/aio/aio_cancel.c b/src/aio/aio_cancel.c
new file mode 100644
index 00000000..5a753b1f
--- /dev/null
+++ b/src/aio/aio_cancel.c
@@ -0,0 +1,16 @@
+#include <aio.h>
+#include <pthread.h>
+#include <errno.h>
+
+int aio_cancel(int fd, struct aiocb *cb)
+{
+ if (!cb) {
+ /* FIXME: for correctness, we should return AIO_ALLDONE
+ * if there are no outstanding aio operations on this
+ * file descriptor, but that would require making aio
+ * much slower, and seems to have little advantage since
+ * we don't support cancellation anyway. */
+ return AIO_NOTCANCELED;
+ }
+ return cb->__err==EINPROGRESS ? AIO_NOTCANCELED : AIO_ALLDONE;
+}
diff --git a/src/aio/aio_error.c b/src/aio/aio_error.c
new file mode 100644
index 00000000..169a9a30
--- /dev/null
+++ b/src/aio/aio_error.c
@@ -0,0 +1,6 @@
+#include <aio.h>
+
+int aio_error(struct aiocb *cb)
+{
+ return cb->__err;
+}
diff --git a/src/aio/aio_fsync.c b/src/aio/aio_fsync.c
new file mode 100644
index 00000000..0ac6ea87
--- /dev/null
+++ b/src/aio/aio_fsync.c
@@ -0,0 +1,9 @@
+#include <aio.h>
+#include <errno.h>
+
+int aio_fsync(int op, struct aiocb *cb)
+{
+ /* FIXME: unsupported */
+ errno = EINVAL;
+ return -1;
+}
diff --git a/src/aio/aio_readwrite.c b/src/aio/aio_readwrite.c
new file mode 100644
index 00000000..27168f25
--- /dev/null
+++ b/src/aio/aio_readwrite.c
@@ -0,0 +1,104 @@
+#include <aio.h>
+#include <fcntl.h>
+#include "pthread_impl.h"
+
+static void dummy(void)
+{
+}
+
+weak_alias(dummy, __aio_wake);
+
+static void notify_signal(struct sigevent *sev)
+{
+ siginfo_t si = {
+ .si_signo = sev->sigev_signo,
+ .si_value = sev->sigev_value,
+ .si_code = SI_ASYNCIO,
+ .si_pid = __pthread_self()->pid,
+ .si_uid = getuid()
+ };
+ __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si);
+}
+
+static void *io_thread(void *p)
+{
+ struct aiocb *cb = p;
+ int fd = cb->aio_filedes;
+ void *buf = (void *)cb->aio_buf;
+ size_t len = cb->aio_nbytes;
+ off_t off = cb->aio_offset;
+ int op = cb->aio_lio_opcode;
+ struct sigevent sev = cb->aio_sigevent;
+ ssize_t ret;
+
+ if (op == LIO_WRITE) {
+ if ( (fcntl(fd, F_GETFL) & O_APPEND)
+ ||((ret = pwrite(fd, buf, len, off))<0 && errno==ESPIPE) )
+ ret = write(fd, buf, len);
+ } else if (op == LIO_READ) {
+ if ( (ret = pread(fd, buf, len, off))<0 && errno==ESPIPE )
+ ret = read(fd, buf, len);
+ } else {
+ ret = 0;
+ }
+ cb->__ret = ret;
+
+ if (ret < 0) a_store(&cb->__err, errno);
+ else a_store(&cb->__err, 0);
+
+ __aio_wake();
+
+ switch (cb->aio_sigevent.sigev_notify) {
+ case SIGEV_SIGNAL:
+ notify_signal(&sev);
+ break;
+ case SIGEV_THREAD:
+ sev.sigev_notify_function(sev.sigev_value);
+ break;
+ }
+
+ return 0;
+}
+
+static int new_req(struct aiocb *cb)
+{
+ int ret = 0;
+ pthread_attr_t a;
+ sigset_t set;
+ pthread_t td;
+
+ if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
+ if (cb->aio_sigevent.sigev_notify_attributes)
+ a = *cb->aio_sigevent.sigev_notify_attributes;
+ else
+ pthread_attr_init(&a);
+ } else {
+ pthread_attr_init(&a);
+ pthread_attr_setstacksize(&a, PAGE_SIZE);
+ pthread_attr_setguardsize(&a, 0);
+ }
+ pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
+ sigfillset(&set);
+ pthread_sigmask(SIG_BLOCK, &set, &set);
+ cb->__err = EINPROGRESS;
+ if (pthread_create(&td, &a, io_thread, cb)) {
+ errno = EAGAIN;
+ ret = -1;
+ }
+ pthread_sigmask(SIG_SETMASK, &set, 0);
+ cb->__td = td;
+
+ return ret;
+}
+
+ssize_t aio_read(struct aiocb *cb)
+{
+ cb->aio_lio_opcode = LIO_READ;
+ return new_req(cb);
+}
+
+ssize_t aio_write(struct aiocb *cb)
+{
+ cb->aio_lio_opcode = LIO_WRITE;
+ return new_req(cb);
+}
diff --git a/src/aio/aio_return.c b/src/aio/aio_return.c
new file mode 100644
index 00000000..df10bdbe
--- /dev/null
+++ b/src/aio/aio_return.c
@@ -0,0 +1,6 @@
+#include <aio.h>
+
+ssize_t aio_return(struct aiocb *cb)
+{
+ return cb->__ret;
+}
diff --git a/src/aio/aio_suspend.c b/src/aio/aio_suspend.c
new file mode 100644
index 00000000..cb2539e9
--- /dev/null
+++ b/src/aio/aio_suspend.c
@@ -0,0 +1,57 @@
+#include <aio.h>
+#include <errno.h>
+#include "pthread_impl.h"
+
+/* Due to the requirement that aio_suspend be async-signal-safe, we cannot
+ * use any locks, wait queues, etc. that would make it more efficient. The
+ * only obviously-correct algorithm is to generate a wakeup every time any
+ * aio operation finishes and have aio_suspend re-evaluate the completion
+ * status of each aiocb it was waiting on. */
+
+static volatile int seq;
+
+void __aio_wake(void)
+{
+ a_inc(&seq);
+ __wake(&seq, -1, 1);
+}
+
+int aio_suspend(struct aiocb *const cbs[], int cnt, const struct timespec *ts)
+{
+ int i, last, first=1, ret=0;
+ struct timespec at;
+
+ if (cnt<0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ for (;;) {
+ last = seq;
+
+ for (i=0; i<cnt; i++) {
+ if (cbs[i] && cbs[i]->__err != EINPROGRESS)
+ return 0;
+ }
+
+ if (first && ts) {
+ clock_gettime(CLOCK_MONOTONIC, &at);
+ at.tv_sec += ts->tv_sec;
+ if ((at.tv_nsec += ts->tv_nsec) >= 1000000000) {
+ at.tv_nsec -= 1000000000;
+ at.tv_sec++;
+ }
+ first = 0;
+ }
+
+ ret = __timedwait(&seq, last, CLOCK_MONOTONIC,
+ ts ? &at : 0, 0, 0, 1);
+
+ if (ret == ETIMEDOUT) ret = EAGAIN;
+
+ if (ret) {
+ errno = ret;
+ return -1;
+ }
+ }
+}
diff --git a/src/aio/lio_listio.c b/src/aio/lio_listio.c
new file mode 100644
index 00000000..8865029a
--- /dev/null
+++ b/src/aio/lio_listio.c
@@ -0,0 +1,140 @@
+#include <aio.h>
+#include <errno.h>
+#include "pthread_impl.h"
+
+struct lio_state {
+ struct sigevent *sev;
+ int cnt;
+ struct aiocb *cbs[];
+};
+
+static int lio_wait(struct lio_state *st)
+{
+ int i, err, got_err;
+ int cnt = st->cnt;
+ struct aiocb **cbs = st->cbs;
+
+ for (;;) {
+ for (i=0; i<cnt; i++) {
+ if (!cbs[i]) continue;
+ err = aio_error(cbs[i]);
+ if (err==EINPROGRESS)
+ break;
+ if (err) got_err=1;
+ cbs[i] = 0;
+ }
+ if (i==cnt) {
+ if (got_err) {
+ errno = EIO;
+ return -1;
+ }
+ return 0;
+ }
+ if (aio_suspend(cbs, cnt, 0))
+ return -1;
+ }
+}
+
+static void notify_signal(struct sigevent *sev)
+{
+ siginfo_t si = {
+ .si_signo = sev->sigev_signo,
+ .si_value = sev->sigev_value,
+ .si_code = SI_ASYNCIO,
+ .si_pid = __pthread_self()->pid,
+ .si_uid = getuid()
+ };
+ __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si);
+}
+
+static void *wait_thread(void *p)
+{
+ struct lio_state *st = p;
+ struct sigevent *sev = st->sev;
+ lio_wait(st);
+ free(st);
+ switch (sev->sigev_notify) {
+ case SIGEV_SIGNAL:
+ notify_signal(sev);
+ break;
+ case SIGEV_THREAD:
+ sev->sigev_notify_function(sev->sigev_value);
+ break;
+ }
+ return 0;
+}
+
+int lio_listio(int mode, struct aiocb *const cbs[], int cnt, struct sigevent *sev)
+{
+ int i, ret;
+ struct lio_state *st=0;
+
+ if (cnt < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (mode == LIO_WAIT || (sev && sev->sigev_notify != SIGEV_NONE)) {
+ if (!(st = malloc(sizeof *st + cnt*sizeof *cbs))) {
+ errno = EAGAIN;
+ return -1;
+ }
+ st->cnt = cnt;
+ st->sev = sev;
+ memcpy(st->cbs, cbs, cnt*sizeof *cbs);
+ }
+
+ for (i=0; i<cnt; i++) {
+ if (!cbs[i]) continue;
+ switch (cbs[i]->aio_lio_opcode) {
+ case LIO_READ:
+ ret = aio_read(cbs[i]);
+ break;
+ case LIO_WRITE:
+ ret = aio_write(cbs[i]);
+ break;
+ default:
+ continue;
+ }
+ if (ret) {
+ free(st);
+ errno = EAGAIN;
+ return -1;
+ }
+ }
+
+ if (mode == LIO_WAIT) {
+ ret = lio_wait(st);
+ free(st);
+ return 0;
+ }
+
+ if (st) {
+ pthread_attr_t a;
+ sigset_t set;
+ pthread_t td;
+
+ if (sev->sigev_notify == SIGEV_THREAD) {
+ if (sev->sigev_notify_attributes)
+ a = *sev->sigev_notify_attributes;
+ else
+ pthread_attr_init(&a);
+ } else {
+ pthread_attr_init(&a);
+ pthread_attr_setstacksize(&a, PAGE_SIZE);
+ pthread_attr_setguardsize(&a, 0);
+ }
+ pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
+ sigfillset(&set);
+ pthread_sigmask(SIG_BLOCK, &set, &set);
+ if (pthread_create(&td, &a, wait_thread, st)) {
+ free(st);
+ errno = EAGAIN;
+ return -1;
+ }
+ pthread_sigmask(SIG_SETMASK, &set, 0);
+ }
+
+ return 0;
+}
+