From 66d924fbe4f33154eb3c6968080cc2d5b815e919 Mon Sep 17 00:00:00 2001 From: mpi Date: Thu, 14 Oct 2021 08:46:01 +0000 Subject: [PATCH] Implement select(2) and pselect(2) on top of kqueue. The given set of fds are converted to equivalent kevents using EV_SET(2) and passed to the scanning internals of kevent(2): kqueue_scan(). Those events are lazily deleted to reduce the overhard of freeing/allocating them when select(2) is called in a loop. ktrace(1) will now output the converted kevents on top of the usuals set bits to be able to find possible error in the convertion. This switch implies that select(2) and pselect(2) will now query the underlying kqfilters instead of the *_poll() routines. An increase in latency is visible, especially with UDP sockets and NET_LOCK()-contended subsystems and will be addressed in a next step. The various *_poll() routines could be removed as soon as poll(2) and ppoll(2) are also converted. Based on similar work done on DragonFlyBSD with inputs from from visa@, millert@, anton@, cheloha@, thanks! ok claudio@, bluhm@ --- sys/kern/sys_generic.c | 213 ++++++++++++++++++++++++++++++----------- 1 file changed, 155 insertions(+), 58 deletions(-) diff --git a/sys/kern/sys_generic.c b/sys/kern/sys_generic.c index 025aa237e69..5b5a7df6c2c 100644 --- a/sys/kern/sys_generic.c +++ b/sys/kern/sys_generic.c @@ -1,4 +1,4 @@ -/* $OpenBSD: sys_generic.c,v 1.135 2021/01/08 09:29:04 visa Exp $ */ +/* $OpenBSD: sys_generic.c,v 1.136 2021/10/14 08:46:01 mpi Exp $ */ /* $NetBSD: sys_generic.c,v 1.24 1996/03/29 00:25:32 cgd Exp $ */ /* @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef KTRACE #include #endif @@ -66,8 +67,21 @@ #include -int selscan(struct proc *, fd_set *, fd_set *, int, int, register_t *); -void pollscan(struct proc *, struct pollfd *, u_int, register_t *); +/* + * Debug values: + * 1 - print implementation errors, things that should not happen. + * 2 - print ppoll(2) information, somewhat verbose + * 3 - print pselect(2) and ppoll(2) information, very verbose + */ +int kqpoll_debug = 0; +#define DPRINTFN(v, x...) if (kqpoll_debug > v) { \ + printf("%s(%d): ", curproc->p_p->ps_comm, curproc->p_tid); \ + printf(x); \ +} + +int pselregister(struct proc *, fd_set *[], int, int *); +int pselcollect(struct proc *, struct kevent *, fd_set *[], int *); + int pollout(struct pollfd *, struct pollfd *, u_int); int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *, struct timespec *, const sigset_t *, register_t *); @@ -584,11 +598,10 @@ int dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, struct timespec *timeout, const sigset_t *sigmask, register_t *retval) { + struct kqueue_scan_state scan; fd_mask bits[6]; fd_set *pibits[3], *pobits[3]; - struct timespec elapsed, start, stop; - uint64_t nsecs; - int s, ncoll, error = 0; + int error, ncollected = 0, nevents = 0; u_int ni; if (nd < 0) @@ -618,6 +631,8 @@ dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, pobits[2] = (fd_set *)&bits[5]; } + kqpoll_init(); + #define getbits(name, x) \ if (name && (error = copyin(name, pibits[x], ni))) \ goto done; @@ -636,43 +651,61 @@ dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, if (sigmask) dosigsuspend(p, *sigmask &~ sigcantmask); -retry: - ncoll = nselcoll; - atomic_setbits_int(&p->p_flag, P_SELECT); - error = selscan(p, pibits[0], pobits[0], nd, ni, retval); - if (error || *retval) + /* Register kqueue events */ + error = pselregister(p, pibits, nd, &nevents); + if (error != 0) goto done; - if (timeout == NULL || timespecisset(timeout)) { - if (timeout != NULL) { - getnanouptime(&start); - nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP); - } else - nsecs = INFSLP; - s = splhigh(); - if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) { - splx(s); - goto retry; - } - atomic_clearbits_int(&p->p_flag, P_SELECT); - error = tsleep_nsec(&selwait, PSOCK | PCATCH, "select", nsecs); - splx(s); + + /* + * The poll/select family of syscalls has been designed to + * block when file descriptors are not available, even if + * there's nothing to wait for. + */ + if (nevents == 0) { + uint64_t nsecs = INFSLP; + if (timeout != NULL) { - getnanouptime(&stop); - timespecsub(&stop, &start, &elapsed); - timespecsub(timeout, &elapsed, timeout); - if (timeout->tv_sec < 0) - timespecclear(timeout); + if (!timespecisset(timeout)) + goto done; + nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP)); } - if (error == 0 || error == EWOULDBLOCK) - goto retry; + error = tsleep_nsec(&p->p_kq, PSOCK | PCATCH, "kqsel", nsecs); + /* select is not restarted after signals... */ + if (error == ERESTART) + error = EINTR; + if (error == EWOULDBLOCK) + error = 0; + goto done; } -done: - atomic_clearbits_int(&p->p_flag, P_SELECT); - /* select is not restarted after signals... */ - if (error == ERESTART) - error = EINTR; - if (error == EWOULDBLOCK) - error = 0; + + /* Collect at most `nevents' possibly waiting in kqueue_scan() */ + kqueue_scan_setup(&scan, p->p_kq); + while (nevents > 0) { + struct kevent kev[KQ_NEVENTS]; + int i, ready, count; + + /* Maximum number of events per iteration */ + count = MIN(nitems(kev), nevents); + ready = kqueue_scan(&scan, count, kev, timeout, p, &error); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + /* Convert back events that are ready. */ + for (i = 0; i < ready && error == 0; i++) + error = pselcollect(p, &kev[i], pobits, &ncollected); + /* + * Stop if there was an error or if we had enough + * space to collect all events that were ready. + */ + if (error || ready < count) + break; + + nevents -= ready; + } + kqueue_scan_finish(&scan); + *retval = ncollected; + done: #define putbits(name, x) \ if (name && (error2 = copyout(pobits[x], name, ni))) \ error = error2; @@ -694,40 +727,104 @@ done: if (pibits[0] != (fd_set *)&bits[0]) free(pibits[0], M_TEMP, 6 * ni); + + kqueue_purge(p, p->p_kq); + p->p_kq_serial += nd; + return (error); } +/* + * Convert fd_set into kqueue events and register them on the + * per-thread queue. + */ int -selscan(struct proc *p, fd_set *ibits, fd_set *obits, int nfd, int ni, - register_t *retval) +pselregister(struct proc *p, fd_set *pibits[3], int nfd, int *nregistered) { - caddr_t cibits = (caddr_t)ibits, cobits = (caddr_t)obits; - struct filedesc *fdp = p->p_fd; - int msk, i, j, fd; + static const int evf[] = { EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT }; + static const int evff[] = { 0, 0, NOTE_OOB }; + int msk, i, j, fd, nevents = 0, error = 0; + struct kevent kev; fd_mask bits; - struct file *fp; - int n = 0; - static const int flag[3] = { POLLIN, POLLOUT|POLL_NOHUP, POLLPRI }; for (msk = 0; msk < 3; msk++) { - fd_set *pibits = (fd_set *)&cibits[msk*ni]; - fd_set *pobits = (fd_set *)&cobits[msk*ni]; - for (i = 0; i < nfd; i += NFDBITS) { - bits = pibits->fds_bits[i/NFDBITS]; + bits = pibits[msk]->fds_bits[i / NFDBITS]; while ((j = ffs(bits)) && (fd = i + --j) < nfd) { bits &= ~(1 << j); - if ((fp = fd_getfile(fdp, fd)) == NULL) - return (EBADF); - if ((*fp->f_ops->fo_poll)(fp, flag[msk], p)) { - FD_SET(fd, pobits); - n++; + + DPRINTFN(2, "select fd %d mask %d serial %lu\n", + fd, msk, p->p_kq_serial); + EV_SET(&kev, fd, evf[msk], + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, + evff[msk], 0, (void *)(p->p_kq_serial)); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, &kev, 1); +#endif + error = kqueue_register(p->p_kq, &kev, p); + switch (error) { + case 0: + nevents++; + /* FALLTHROUGH */ + case EOPNOTSUPP:/* No underlying kqfilter */ + case EINVAL: /* Unimplemented filter */ + error = 0; + break; + case ENXIO: /* Device has been detached */ + default: + goto bad; } - FRELE(fp, p); } } } - *retval = n; + + *nregistered = nevents; + return (0); +bad: + DPRINTFN(0, "select fd %u filt %d error %d\n", (int)kev.ident, + kev.filter, error); + return (error); +} + +/* + * Convert given kqueue event into corresponding select(2) bit. + */ +int +pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3], + int *ncollected) +{ + /* Filter out and lazily delete spurious events */ + if ((unsigned long)kevp->udata != p->p_kq_serial) { + DPRINTFN(0, "select fd %u mismatched serial %lu\n", + (int)kevp->ident, p->p_kq_serial); + kevp->flags = EV_DISABLE|EV_DELETE; + kqueue_register(p->p_kq, kevp, p); + return (0); + } + + if (kevp->flags & EV_ERROR) { + DPRINTFN(2, "select fd %d filt %d error %d\n", + (int)kevp->ident, kevp->filter, (int)kevp->data); + return (kevp->data); + } + + switch (kevp->filter) { + case EVFILT_READ: + FD_SET(kevp->ident, pobits[0]); + break; + case EVFILT_WRITE: + FD_SET(kevp->ident, pobits[1]); + break; + case EVFILT_EXCEPT: + FD_SET(kevp->ident, pobits[2]); + break; + default: + KASSERT(0); + } + (*ncollected)++; + + DPRINTFN(2, "select fd %d filt %d\n", (int)kevp->ident, kevp->filter); return (0); } -- 2.20.1