From fdd2406a066a41ff8a28f7c2229907702e86d1f0 Mon Sep 17 00:00:00 2001 From: mpi Date: Sun, 24 Oct 2021 11:23:22 +0000 Subject: [PATCH] Implement poll(2), select(2), ppoll(2) & pselect(2) on top of kqueue. The given set of fds are converted to equivalent kevents using EV_SET(2) and passed to the scanning internals of kevent(2): kqueue_scan(). ktrace(1) will now output the converted kevents on top of the usuals set bits to be able to find possible error in the convertion. This switch implies that poll(2) and select(2) will now query underlying kqfilters instead of the *_poll() routines. An increase in latency is visible, especially with UDP sockets and NET_LOCK()-contended subsystems and will be addressed in next steps. Based on similar work done on MacOS and DragonFlyBSD with inputs from visa@, millert@, anton@, cheloha@, thanks! Tested by many, thanks! ok claudio@, bluhm@ --- sys/kern/sys_generic.c | 540 ++++++++++++++++++++++++++------- sys/miscfs/fifofs/fifo_vnops.c | 8 +- 2 files changed, 432 insertions(+), 116 deletions(-) diff --git a/sys/kern/sys_generic.c b/sys/kern/sys_generic.c index 1ab1c808b9e..cbe5d04c54f 100644 --- a/sys/kern/sys_generic.c +++ b/sys/kern/sys_generic.c @@ -1,4 +1,4 @@ -/* $OpenBSD: sys_generic.c,v 1.137 2021/10/15 06:59:57 mpi Exp $ */ +/* $OpenBSD: sys_generic.c,v 1.138 2021/10/24 11:23:22 mpi Exp $ */ /* $NetBSD: sys_generic.c,v 1.24 1996/03/29 00:25:32 cgd Exp $ */ /* @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef KTRACE #include #endif @@ -66,8 +67,23 @@ #include -int selscan(struct proc *, fd_set *, fd_set *, int, int, register_t *); -void pollscan(struct proc *, struct pollfd *, u_int, register_t *); +/* + * Debug values: + * 1 - print implementation errors, things that should not happen. + * 2 - print ppoll(2) information, somewhat verbose + * 3 - print pselect(2) and ppoll(2) information, very verbose + */ +int kqpoll_debug = 0; +#define DPRINTFN(v, x...) if (kqpoll_debug > v) { \ + printf("%s(%d): ", curproc->p_p->ps_comm, curproc->p_tid); \ + printf(x); \ +} + +int pselregister(struct proc *, fd_set *[], fd_set *[], int, int *, int *); +int pselcollect(struct proc *, struct kevent *, fd_set *[], int *); +int ppollregister(struct proc *, struct pollfd *, int, int *); +int ppollcollect(struct proc *, struct kevent *, struct pollfd *, u_int); + int pollout(struct pollfd *, struct pollfd *, u_int); int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *, struct timespec *, const sigset_t *, register_t *); @@ -584,11 +600,10 @@ int dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, struct timespec *timeout, const sigset_t *sigmask, register_t *retval) { + struct kqueue_scan_state scan; fd_mask bits[6]; fd_set *pibits[3], *pobits[3]; - struct timespec elapsed, start, stop; - uint64_t nsecs; - int s, ncoll, error = 0; + int error, ncollected = 0, nevents = 0; u_int ni; if (nd < 0) @@ -618,6 +633,8 @@ dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, pobits[2] = (fd_set *)&bits[5]; } + kqpoll_init(); + #define getbits(name, x) \ if (name && (error = copyin(name, pibits[x], ni))) \ goto done; @@ -636,43 +653,61 @@ dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, if (sigmask) dosigsuspend(p, *sigmask &~ sigcantmask); -retry: - ncoll = nselcoll; - atomic_setbits_int(&p->p_flag, P_SELECT); - error = selscan(p, pibits[0], pobits[0], nd, ni, retval); - if (error || *retval) + /* Register kqueue events */ + error = pselregister(p, pibits, pobits, nd, &nevents, &ncollected); + if (error != 0) goto done; - if (timeout == NULL || timespecisset(timeout)) { - if (timeout != NULL) { - getnanouptime(&start); - nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP); - } else - nsecs = INFSLP; - s = splhigh(); - if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) { - splx(s); - goto retry; - } - atomic_clearbits_int(&p->p_flag, P_SELECT); - error = tsleep_nsec(&selwait, PSOCK | PCATCH, "select", nsecs); - splx(s); + + /* + * The poll/select family of syscalls has been designed to + * block when file descriptors are not available, even if + * there's nothing to wait for. + */ + if (nevents == 0 && ncollected == 0) { + uint64_t nsecs = INFSLP; + if (timeout != NULL) { - getnanouptime(&stop); - timespecsub(&stop, &start, &elapsed); - timespecsub(timeout, &elapsed, timeout); - if (timeout->tv_sec < 0) - timespecclear(timeout); + if (!timespecisset(timeout)) + goto done; + nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP)); } - if (error == 0 || error == EWOULDBLOCK) - goto retry; + error = tsleep_nsec(&p->p_kq, PSOCK | PCATCH, "kqsel", nsecs); + /* select is not restarted after signals... */ + if (error == ERESTART) + error = EINTR; + if (error == EWOULDBLOCK) + error = 0; + goto done; } + + /* Collect at most `nevents' possibly waiting in kqueue_scan() */ + kqueue_scan_setup(&scan, p->p_kq); + while (nevents > 0) { + struct kevent kev[KQ_NEVENTS]; + int i, ready, count; + + /* Maximum number of events per iteration */ + count = MIN(nitems(kev), nevents); + ready = kqueue_scan(&scan, count, kev, timeout, p, &error); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + /* Convert back events that are ready. */ + for (i = 0; i < ready && error == 0; i++) + error = pselcollect(p, &kev[i], pobits, &ncollected); + /* + * Stop if there was an error or if we had enough + * space to collect all events that were ready. + */ + if (error || ready < count) + break; + + nevents -= ready; + } + kqueue_scan_finish(&scan); + *retval = ncollected; done: - atomic_clearbits_int(&p->p_flag, P_SELECT); - /* select is not restarted after signals... */ - if (error == ERESTART) - error = EINTR; - if (error == EWOULDBLOCK) - error = 0; #define putbits(name, x) \ if (name && (error2 = copyout(pobits[x], name, ni))) \ error = error2; @@ -694,40 +729,112 @@ done: if (pibits[0] != (fd_set *)&bits[0]) free(pibits[0], M_TEMP, 6 * ni); + + kqueue_purge(p, p->p_kq); + p->p_kq_serial += nd; + return (error); } +/* + * Convert fd_set into kqueue events and register them on the + * per-thread queue. + */ int -selscan(struct proc *p, fd_set *ibits, fd_set *obits, int nfd, int ni, - register_t *retval) +pselregister(struct proc *p, fd_set *pibits[3], fd_set *pobits[3], int nfd, + int *nregistered, int *ncollected) { - caddr_t cibits = (caddr_t)ibits, cobits = (caddr_t)obits; - struct filedesc *fdp = p->p_fd; - int msk, i, j, fd; + static const int evf[] = { EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT }; + static const int evff[] = { 0, 0, NOTE_OOB }; + int msk, i, j, fd, nevents = 0, error = 0; + struct kevent kev; fd_mask bits; - struct file *fp; - int n = 0; - static const int flag[3] = { POLLIN, POLLOUT|POLL_NOHUP, POLLPRI }; for (msk = 0; msk < 3; msk++) { - fd_set *pibits = (fd_set *)&cibits[msk*ni]; - fd_set *pobits = (fd_set *)&cobits[msk*ni]; - for (i = 0; i < nfd; i += NFDBITS) { - bits = pibits->fds_bits[i/NFDBITS]; + bits = pibits[msk]->fds_bits[i / NFDBITS]; while ((j = ffs(bits)) && (fd = i + --j) < nfd) { bits &= ~(1 << j); - if ((fp = fd_getfile(fdp, fd)) == NULL) - return (EBADF); - if ((*fp->f_ops->fo_poll)(fp, flag[msk], p)) { - FD_SET(fd, pobits); - n++; + + DPRINTFN(2, "select fd %d mask %d serial %lu\n", + fd, msk, p->p_kq_serial); + EV_SET(&kev, fd, evf[msk], + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, + evff[msk], 0, (void *)(p->p_kq_serial)); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, &kev, 1); +#endif + error = kqueue_register(p->p_kq, &kev, p); + switch (error) { + case 0: + nevents++; + /* FALLTHROUGH */ + case EOPNOTSUPP:/* No underlying kqfilter */ + case EINVAL: /* Unimplemented filter */ + case EPERM: /* Specific to FIFO */ + error = 0; + break; + case EPIPE: /* Specific to pipes */ + KASSERT(kev.filter == EVFILT_WRITE); + FD_SET(kev.ident, pobits[1]); + (*ncollected)++; + error = 0; + break; + case ENXIO: /* Device has been detached */ + default: + goto bad; } - FRELE(fp, p); } } } - *retval = n; + + *nregistered = nevents; + return (0); +bad: + DPRINTFN(0, "select fd %u filt %d error %d\n", (int)kev.ident, + kev.filter, error); + return (error); +} + +/* + * Convert given kqueue event into corresponding select(2) bit. + */ +int +pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3], + int *ncollected) +{ + /* Filter out and lazily delete spurious events */ + if ((unsigned long)kevp->udata != p->p_kq_serial) { + DPRINTFN(0, "select fd %u mismatched serial %lu\n", + (int)kevp->ident, p->p_kq_serial); + kevp->flags = EV_DISABLE|EV_DELETE; + kqueue_register(p->p_kq, kevp, p); + return (0); + } + + if (kevp->flags & EV_ERROR) { + DPRINTFN(2, "select fd %d filt %d error %d\n", + (int)kevp->ident, kevp->filter, (int)kevp->data); + return (kevp->data); + } + + switch (kevp->filter) { + case EVFILT_READ: + FD_SET(kevp->ident, pobits[0]); + break; + case EVFILT_WRITE: + FD_SET(kevp->ident, pobits[1]); + break; + case EVFILT_EXCEPT: + FD_SET(kevp->ident, pobits[2]); + break; + default: + KASSERT(0); + } + (*ncollected)++; + + DPRINTFN(2, "select fd %d filt %d\n", (int)kevp->ident, kevp->filter); return (0); } @@ -802,31 +909,134 @@ doselwakeup(struct selinfo *sip) } } -void -pollscan(struct proc *p, struct pollfd *pl, u_int nfd, register_t *retval) +int +ppollregister_evts(struct proc *p, struct kevent *kevp, int nkev, + struct pollfd *pl) { - struct filedesc *fdp = p->p_fd; - struct file *fp; - u_int i; - int n = 0; + int i, error, nevents = 0; - for (i = 0; i < nfd; i++, pl++) { - /* Check the file descriptor. */ - if (pl->fd < 0) { - pl->revents = 0; - continue; + KASSERT(pl->revents == 0); + +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kevp, nkev); +#endif + for (i = 0; i < nkev; i++, kevp++) { +again: + error = kqueue_register(p->p_kq, kevp, p); + switch (error) { + case 0: + nevents++; + break; + case EOPNOTSUPP:/* No underlying kqfilter */ + case EINVAL: /* Unimplemented filter */ + break; + case EBADF: /* Bad file descriptor */ + pl->revents |= POLLNVAL; + break; + case EPERM: /* Specific to FIFO */ + KASSERT(kevp->filter == EVFILT_WRITE); + if (nkev == 1) { + /* + * If this is the only filter make sure + * POLLHUP is passed to userland. + */ + kevp->filter = EVFILT_EXCEPT; + goto again; + } + break; + case EPIPE: /* Specific to pipes */ + KASSERT(kevp->filter == EVFILT_WRITE); + pl->revents |= POLLHUP; + break; + default: +#ifdef DIAGNOSTIC + DPRINTFN(0, "poll err %lu fd %d revents %02x serial" + " %lu filt %d ERROR=%d\n", + ((unsigned long)kevp->udata - p->p_kq_serial), + pl->fd, pl->revents, p->p_kq_serial, kevp->filter, + error); +#endif + /* FALLTHROUGH */ + case ENXIO: /* Device has been detached */ + pl->revents |= POLLERR; + break; } - if ((fp = fd_getfile(fdp, pl->fd)) == NULL) { - pl->revents = POLLNVAL; - n++; + } + + return (nevents); +} + +/* + * Convert pollfd into kqueue events and register them on the + * per-thread queue. + * + * Return the number of pollfd that triggered at least one error and aren't + * completly monitored. These pollfd should have the correponding error bit + * set in `revents'. + * + * At most 3 events can correspond to a single pollfd. + */ +int +ppollregister(struct proc *p, struct pollfd *pl, int nfds, int *nregistered) +{ + int i, nkev, nevt, errcount = 0, forcehup = 0; + struct kevent kev[3], *kevp; + + for (i = 0; i < nfds; i++) { + pl[i].events &= ~POLL_NOHUP; + pl[i].revents = 0; + + if (pl[i].fd < 0) continue; + + if (pl[i].events == 0) + forcehup = 1; + + DPRINTFN(1, "poll set %d/%d fd %d events %02x serial %lu\n", + i+1, nfds, pl[i].fd, pl[i].events, p->p_kq_serial); + + nevt = 0; + nkev = 0; + kevp = kev; + if (pl[i].events & (POLLIN | POLLRDNORM)) { + EV_SET(kevp, pl[i].fd, EVFILT_READ, + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, 0, 0, + (void *)(p->p_kq_serial + i)); + nkev++; + kevp++; + } + if (pl[i].events & (POLLOUT | POLLWRNORM)) { + EV_SET(kevp, pl[i].fd, EVFILT_WRITE, + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, 0, 0, + (void *)(p->p_kq_serial + i)); + nkev++; + kevp++; } - pl->revents = (*fp->f_ops->fo_poll)(fp, pl->events, p); - FRELE(fp, p); - if (pl->revents != 0) - n++; + if ((pl[i].events & (POLLPRI | POLLRDBAND)) || forcehup) { + int evff = forcehup ? 0 : NOTE_OOB; + + EV_SET(kevp, pl[i].fd, EVFILT_EXCEPT, + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, evff, 0, + (void *)(p->p_kq_serial + i)); + nkev++; + kevp++; + } + + if (nkev == 0) + continue; + + nevt = ppollregister_evts(p, kev, nkev, &pl[i]); + if (nevt == 0 && !forcehup) + errcount++; + *nregistered += nevt; } - *retval = n; + +#ifdef DIAGNOSTIC + DPRINTFN(1, "poll registered = %d, errors = %d\n", *nregistered, + errcount); +#endif + return (errcount); } /* @@ -916,11 +1126,10 @@ int doppoll(struct proc *p, struct pollfd *fds, u_int nfds, struct timespec *timeout, const sigset_t *sigmask, register_t *retval) { - size_t sz; + struct kqueue_scan_state scan; struct pollfd pfds[4], *pl = pfds; - struct timespec elapsed, start, stop; - uint64_t nsecs; - int ncoll, i, s, error; + int error, nevents = 0; + size_t sz; /* Standards say no more than MAX_OPEN; this is possibly better. */ if (nfds > min((int)lim_cur(RLIMIT_NOFILE), maxfiles)) @@ -934,58 +1143,75 @@ doppoll(struct proc *p, struct pollfd *fds, u_int nfds, return (EINVAL); } + kqpoll_init(); + sz = nfds * sizeof(*pl); if ((error = copyin(fds, pl, sz)) != 0) goto bad; - for (i = 0; i < nfds; i++) { - pl[i].events &= ~POLL_NOHUP; - pl[i].revents = 0; - } - if (sigmask) dosigsuspend(p, *sigmask &~ sigcantmask); -retry: - ncoll = nselcoll; - atomic_setbits_int(&p->p_flag, P_SELECT); - pollscan(p, pl, nfds, retval); - if (*retval) - goto done; - if (timeout == NULL || timespecisset(timeout)) { - if (timeout != NULL) { - getnanouptime(&start); - nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP); - } else - nsecs = INFSLP; - s = splhigh(); - if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) { - splx(s); - goto retry; - } - atomic_clearbits_int(&p->p_flag, P_SELECT); - error = tsleep_nsec(&selwait, PSOCK | PCATCH, "poll", nsecs); - splx(s); + /* Register kqueue events */ + *retval = ppollregister(p, pl, nfds, &nevents); + + /* + * The poll/select family of syscalls has been designed to + * block when file descriptors are not available, even if + * there's nothing to wait for. + */ + if (nevents == 0) { + uint64_t nsecs = INFSLP; + if (timeout != NULL) { - getnanouptime(&stop); - timespecsub(&stop, &start, &elapsed); - timespecsub(timeout, &elapsed, timeout); - if (timeout->tv_sec < 0) - timespecclear(timeout); + if (!timespecisset(timeout)) + goto done; + nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP)); } - if (error == 0 || error == EWOULDBLOCK) - goto retry; + + error = tsleep_nsec(&p->p_kq, PSOCK | PCATCH, "kqpoll", nsecs); + if (error == ERESTART) + error = EINTR; + if (error == EWOULDBLOCK) + error = 0; + goto done; } + /* Collect at most `nevents' possibly waiting in kqueue_scan() */ + kqueue_scan_setup(&scan, p->p_kq); + while (nevents > 0) { + struct kevent kev[KQ_NEVENTS]; + int i, ready, count; + + /* Maxium number of events per iteration */ + count = MIN(nitems(kev), nevents); + ready = kqueue_scan(&scan, count, kev, timeout, p, &error); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + /* Convert back events that are ready. */ + for (i = 0; i < ready; i++) + *retval += ppollcollect(p, &kev[i], pl, nfds); + + /* + * Stop if there was an error or if we had enough + * place to collect all events that were ready. + */ + if (error || ready < count) + break; + + nevents -= ready; + } + kqueue_scan_finish(&scan); done: - atomic_clearbits_int(&p->p_flag, P_SELECT); /* * NOTE: poll(2) is not restarted after a signal and EWOULDBLOCK is * ignored (since the whole point is to see what would block). */ switch (error) { - case ERESTART: + case EINTR: error = pollout(pl, fds, nfds); if (error == 0) error = EINTR; @@ -1002,9 +1228,95 @@ done: bad: if (pl != pfds) free(pl, M_TEMP, sz); + + kqueue_purge(p, p->p_kq); + p->p_kq_serial += nfds; + return (error); } +/* + * Convert given kqueue event into corresponding poll(2) revents bit. + */ +int +ppollcollect(struct proc *p, struct kevent *kevp, struct pollfd *pl, u_int nfds) +{ + int already_seen; + unsigned long i; + + /* Extract poll array index */ + i = (unsigned long)kevp->udata - p->p_kq_serial; + + /* + * Lazily delete spurious events. + * + * This should not happen as long as kqueue_purge() is called + * at the end of every syscall. It migh be interesting to do + * like DragonFlyBSD and not always allocated a new knote in + * kqueue_register() with that lazy removal makes sense. + */ + if (i >= nfds) { + DPRINTFN(0, "poll get out of range udata %lu vs serial %lu\n", + (unsigned long)kevp->udata, p->p_kq_serial); + kevp->flags = EV_DISABLE|EV_DELETE; + kqueue_register(p->p_kq, kevp, p); + return (0); + } + if ((int)kevp->ident != pl[i].fd) { + DPRINTFN(0, "poll get %lu/%d mismatch fd %u!=%d serial %lu\n", + i+1, nfds, (int)kevp->ident, pl[i].fd, p->p_kq_serial); + return (0); + } + + /* + * A given descriptor may already have generated an error + * against another filter during kqueue_register(). + * + * Make sure to set the appropriate flags but do not + * increment `*retval' more than once. + */ + already_seen = (pl[i].revents != 0); + + switch (kevp->filter) { + case EVFILT_READ: + if (kevp->flags & __EV_HUP) + pl[i].revents |= POLLHUP; + if (pl[i].events & (POLLIN | POLLRDNORM)) + pl[i].revents |= pl[i].events & (POLLIN | POLLRDNORM); + break; + case EVFILT_WRITE: + /* POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ + if (kevp->flags & __EV_HUP) { + pl[i].revents |= POLLHUP; + } else if (pl[i].events & (POLLOUT | POLLWRNORM)) { + pl[i].revents |= pl[i].events & (POLLOUT | POLLWRNORM); + } + break; + case EVFILT_EXCEPT: + if (kevp->flags & __EV_HUP) { +#ifdef DIAGNOSTIC + if (pl[i].events != 0 && pl[i].events != POLLOUT) + DPRINTFN(0, "weird events %x\n", pl[i].events); +#endif + pl[i].revents |= POLLHUP; + break; + } + if (pl[i].events & (POLLPRI | POLLRDBAND)) + pl[i].revents |= pl[i].events & (POLLPRI | POLLRDBAND); + break; + default: + KASSERT(0); + } + + DPRINTFN(1, "poll get %lu/%d fd %d revents %02x serial %lu filt %d\n", + i+1, nfds, pl[i].fd, pl[i].revents, (unsigned long)kevp->udata, + kevp->filter); + if (!already_seen && (pl[i].revents != 0)) + return (1); + + return (0); +} + /* * utrace system call */ diff --git a/sys/miscfs/fifofs/fifo_vnops.c b/sys/miscfs/fifofs/fifo_vnops.c index d6fc18692e6..c039fd411ec 100644 --- a/sys/miscfs/fifofs/fifo_vnops.c +++ b/sys/miscfs/fifofs/fifo_vnops.c @@ -1,4 +1,4 @@ -/* $OpenBSD: fifo_vnops.c,v 1.84 2021/10/24 07:02:47 visa Exp $ */ +/* $OpenBSD: fifo_vnops.c,v 1.85 2021/10/24 11:23:22 mpi Exp $ */ /* $NetBSD: fifo_vnops.c,v 1.18 1996/03/16 23:52:42 christos Exp $ */ /* @@ -529,8 +529,12 @@ fifo_kqfilter(void *v) sb = &so->so_rcv; break; case EVFILT_WRITE: - if (!(ap->a_fflag & FWRITE)) + if (!(ap->a_fflag & FWRITE)) { + /* Tell upper layer to ask for POLLUP only */ + if (ap->a_kn->kn_flags & __EV_POLL) + return (EPERM); return (EINVAL); + } ap->a_kn->kn_fop = &fifowrite_filtops; so = fip->fi_writesock; sb = &so->so_snd; -- 2.20.1