Unlock udp(4) somove().
authormvs <mvs@openbsd.org>
Sat, 20 Jul 2024 17:26:19 +0000 (17:26 +0000)
committermvs <mvs@openbsd.org>
Sat, 20 Jul 2024 17:26:19 +0000 (17:26 +0000)
Socket splicing belongs to sockets buffers. udp(4) sockets are fully
switched to fine-grained buffers locks, so use them instead of exclusive
solock().

Always schedule somove() thread to run as we do for tcp(4) case. This
brings delay to packet processing, but it is comparable wit non splicing
case where soreceive() threads are always scheduled.

So, now spliced udp(4) sockets rely on sb_lock() of `so_rcv' buffer
together with `sb_mtx' mutexes of both buffers. Shared solock() only
required around pru_send() call, so the most of somove() thread runs
simultaneously with network stack.

Also document 'sosplice' structure locking.

Feedback, tests and OK from bluhm.

sys/kern/uipc_socket.c
sys/netinet/udp_usrreq.c
sys/sys/socketvar.h

index a17ac58..5215791 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: uipc_socket.c,v 1.338 2024/07/14 15:42:23 bluhm Exp $ */
+/*     $OpenBSD: uipc_socket.c,v 1.339 2024/07/20 17:26:19 mvs Exp $   */
 /*     $NetBSD: uipc_socket.c,v 1.21 1996/02/04 02:17:52 christos Exp $        */
 
 /*
@@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock)
                        sounlock(head);
        }
 
-       if (persocket) {
+       switch (so->so_proto->pr_domain->dom_family) {
+       case AF_INET:
+       case AF_INET6:
+               if (so->so_proto->pr_type == SOCK_STREAM)
+                       break;
+               /* FALLTHROUGH */
+       default:
                sounlock(so);
                refcnt_finalize(&so->so_refcnt, "sofinal");
                solock(so);
+               break;
        }
 
        sigio_free(&so->so_sigio);
        klist_free(&so->so_rcv.sb_klist);
        klist_free(&so->so_snd.sb_klist);
-#ifdef SOCKET_SPLICE
-       if (issplicedback(so)) {
-               int freeing = SOSP_FREEING_WRITE;
-
-               if (so->so_sp->ssp_soback == so)
-                       freeing |= SOSP_FREEING_READ;
-               sounsplice(so->so_sp->ssp_soback, so, freeing);
-       }
-       if (isspliced(so)) {
-               int freeing = SOSP_FREEING_READ;
-
-               if (so == so->so_sp->ssp_socket)
-                       freeing |= SOSP_FREEING_WRITE;
-               sounsplice(so, so->so_sp->ssp_socket, freeing);
-       }
-#endif /* SOCKET_SPLICE */
 
        mtx_enter(&so->so_snd.sb_mtx);
        sbrelease(so, &so->so_snd);
@@ -458,6 +449,85 @@ discard:
        if (so->so_state & SS_NOFDREF)
                panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
        so->so_state |= SS_NOFDREF;
+
+#ifdef SOCKET_SPLICE
+       if (so->so_sp) {
+               struct socket *soback;
+
+               if (so->so_proto->pr_flags & PR_WANTRCVD) {
+                       /*
+                        * Copy - Paste, but can't relock and sleep in
+                        * sofree() in tcp(4) case. That's why tcp(4)
+                        * still rely on solock() for splicing and
+                        * unsplicing.
+                        */
+
+                       if (issplicedback(so)) {
+                               int freeing = SOSP_FREEING_WRITE;
+
+                               if (so->so_sp->ssp_soback == so)
+                                       freeing |= SOSP_FREEING_READ;
+                               sounsplice(so->so_sp->ssp_soback, so, freeing);
+                       }
+                       if (isspliced(so)) {
+                               int freeing = SOSP_FREEING_READ;
+
+                               if (so == so->so_sp->ssp_socket)
+                                       freeing |= SOSP_FREEING_WRITE;
+                               sounsplice(so, so->so_sp->ssp_socket, freeing);
+                       }
+                       goto free;
+               }
+
+               sounlock(so);
+               mtx_enter(&so->so_snd.sb_mtx);
+               /*
+                * Concurrent sounsplice() locks `sb_mtx' mutexes on
+                * both `so_snd' and `so_rcv' before unsplice sockets.
+                */
+               if ((soback = so->so_sp->ssp_soback) == NULL) {
+                       mtx_leave(&so->so_snd.sb_mtx);
+                       goto notsplicedback;
+               }
+               soref(soback);
+               mtx_leave(&so->so_snd.sb_mtx);
+
+               /*
+                * `so' can be only unspliced, and never spliced again.
+                * Thus if issplicedback(so) check is positive, socket is
+                * still spliced and `ssp_soback' points to the same
+                * socket that `soback'.
+                */
+               sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
+               if (issplicedback(so)) {
+                       int freeing = SOSP_FREEING_WRITE;
+
+                       if (so->so_sp->ssp_soback == so)
+                               freeing |= SOSP_FREEING_READ;
+                       solock(soback);
+                       sounsplice(so->so_sp->ssp_soback, so, freeing);
+                       sounlock(soback);
+               }
+               sbunlock(&soback->so_rcv);
+               sorele(soback);
+
+notsplicedback:
+               sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+               if (isspliced(so)) {
+                       int freeing = SOSP_FREEING_READ;
+
+                       if (so == so->so_sp->ssp_socket)
+                               freeing |= SOSP_FREEING_WRITE;
+                       solock(so);
+                       sounsplice(so, so->so_sp->ssp_socket, freeing);
+                       sounlock(so);
+               }
+               sbunlock(&so->so_rcv);
+
+               solock(so);
+       }
+free:
+#endif /* SOCKET_SPLICE */
        /* sofree() calls sounlock(). */
        sofree(so, 0);
        return (error);
@@ -1411,14 +1481,6 @@ sosplice(struct socket *so, int fd, off_t max, struct timeval *tv)
                goto release;
        }
 
-       /* Splice so and sosp together. */
-       mtx_enter(&so->so_rcv.sb_mtx);
-       mtx_enter(&sosp->so_snd.sb_mtx);
-       so->so_sp->ssp_socket = sosp;
-       sosp->so_sp->ssp_soback = so;
-       mtx_leave(&sosp->so_snd.sb_mtx);
-       mtx_leave(&so->so_rcv.sb_mtx);
-
        so->so_splicelen = 0;
        so->so_splicemax = max;
        if (tv)
@@ -1429,9 +1491,20 @@ sosplice(struct socket *so, int fd, off_t max, struct timeval *tv)
        task_set(&so->so_splicetask, sotask, so);
 
        /*
-        * To prevent softnet interrupt from calling somove() while
-        * we sleep, the socket buffers are not marked as spliced yet.
+        * To prevent sorwakeup() calling somove() before this somove()
+        * has finished, the socket buffers are not marked as spliced yet.
         */
+
+       /* Splice so and sosp together. */
+       mtx_enter(&so->so_rcv.sb_mtx);
+       mtx_enter(&sosp->so_snd.sb_mtx);
+       so->so_sp->ssp_socket = sosp;
+       sosp->so_sp->ssp_soback = so;
+       mtx_leave(&sosp->so_snd.sb_mtx);
+       mtx_leave(&so->so_rcv.sb_mtx);
+
+       if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+               sounlock(so);
        if (somove(so, M_WAIT)) {
                mtx_enter(&so->so_rcv.sb_mtx);
                mtx_enter(&sosp->so_snd.sb_mtx);
@@ -1440,6 +1513,8 @@ sosplice(struct socket *so, int fd, off_t max, struct timeval *tv)
                mtx_leave(&sosp->so_snd.sb_mtx);
                mtx_leave(&so->so_rcv.sb_mtx);
        }
+       if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+               solock(so);
 
  release:
        sounlock(so);
@@ -1454,6 +1529,8 @@ sosplice(struct socket *so, int fd, off_t max, struct timeval *tv)
 void
 sounsplice(struct socket *so, struct socket *sosp, int freeing)
 {
+       if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+               sbassertlocked(&so->so_rcv);
        soassertlocked(so);
 
        task_del(sosplice_taskq, &so->so_splicetask);
@@ -1479,32 +1556,51 @@ soidle(void *arg)
 {
        struct socket *so = arg;
 
+       sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
        solock(so);
+       /*
+        * Depending on socket type, sblock(&so->so_rcv) or solock()
+        * is always held while modifying SB_SPLICE and
+        * so->so_sp->ssp_socket.
+        */
        if (so->so_rcv.sb_flags & SB_SPLICE) {
                so->so_error = ETIMEDOUT;
                sounsplice(so, so->so_sp->ssp_socket, 0);
        }
        sounlock(so);
+       sbunlock(&so->so_rcv);
 }
 
 void
 sotask(void *arg)
 {
        struct socket *so = arg;
+       int doyield = 0;
+       int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD);
+
+       /*
+        * sblock() on `so_rcv' protects sockets from beind unspliced
+        * for UDP case. TCP sockets still rely on solock(). 
+        */
+
+       sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+       if (sockstream)
+               solock(so);
 
-       solock(so);
        if (so->so_rcv.sb_flags & SB_SPLICE) {
-               /*
-                * We may not sleep here as sofree() and unsplice() may be
-                * called from softnet interrupt context.  This would remove
-                * the socket during somove().
-                */
+               if (sockstream)
+                       doyield = 1;
                somove(so, M_DONTWAIT);
        }
-       sounlock(so);
 
-       /* Avoid user land starvation. */
-       yield();
+       if (sockstream)
+               sounlock(so);
+       sbunlock(&so->so_rcv);
+
+       if (doyield) {
+               /* Avoid user land starvation. */
+               yield();
+       }
 }
 
 /*
@@ -1546,24 +1642,32 @@ somove(struct socket *so, int wait)
        struct mbuf     *m, **mp, *nextrecord;
        u_long           len, off, oobmark;
        long             space;
-       int              error = 0, maxreached = 0;
+       int              error = 0, maxreached = 0, unsplice = 0;
        unsigned int     rcvstate;
+       int              sockdgram = ((so->so_proto->pr_flags &
+                            PR_WANTRCVD) == 0);
 
-       soassertlocked(so);
+       if (sockdgram)
+               sbassertlocked(&so->so_rcv);
+       else
+               soassertlocked(so);
+
+       mtx_enter(&so->so_rcv.sb_mtx);
+       mtx_enter(&sosp->so_snd.sb_mtx);
 
  nextpkt:
-       if (so->so_error) {
-               error = so->so_error;
+       if ((error = READ_ONCE(so->so_error)))
                goto release;
-       }
        if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
                error = EPIPE;
                goto release;
        }
-       if (sosp->so_error && sosp->so_error != ETIMEDOUT &&
-           sosp->so_error != EFBIG && sosp->so_error != ELOOP) {
-               error = sosp->so_error;
-               goto release;
+
+       error = READ_ONCE(sosp->so_error);
+       if (error) {
+               if (error != ETIMEDOUT && error != EFBIG && error != ELOOP)
+                       goto release;
+               error = 0;
        }
        if ((sosp->so_state & SS_ISCONNECTED) == 0)
                goto release;
@@ -1577,26 +1681,21 @@ somove(struct socket *so, int wait)
                        maxreached = 1;
                }
        }
-       mtx_enter(&sosp->so_snd.sb_mtx);
        space = sbspace_locked(sosp, &sosp->so_snd);
        if (so->so_oobmark && so->so_oobmark < len &&
            so->so_oobmark < space + 1024)
                space += 1024;
        if (space <= 0) {
-               mtx_leave(&sosp->so_snd.sb_mtx);
                maxreached = 0;
                goto release;
        }
        if (space < len) {
                maxreached = 0;
-               if (space < sosp->so_snd.sb_lowat) {
-                       mtx_leave(&sosp->so_snd.sb_mtx);
+               if (space < sosp->so_snd.sb_lowat)
                        goto release;
-               }
                len = space;
        }
        sosp->so_snd.sb_state |= SS_ISSENDING;
-       mtx_leave(&sosp->so_snd.sb_mtx);
 
        SBLASTRECORDCHK(&so->so_rcv, "somove 1");
        SBLASTMBUFCHK(&so->so_rcv, "somove 1");
@@ -1618,8 +1717,13 @@ somove(struct socket *so, int wait)
                m = m->m_next;
        if (m == NULL) {
                sbdroprecord(so, &so->so_rcv);
-               if (so->so_proto->pr_flags & PR_WANTRCVD)
+               if (so->so_proto->pr_flags & PR_WANTRCVD) {
+                       mtx_leave(&sosp->so_snd.sb_mtx);
+                       mtx_leave(&so->so_rcv.sb_mtx);
                        pru_rcvd(so);
+                       mtx_enter(&so->so_rcv.sb_mtx);
+                       mtx_enter(&sosp->so_snd.sb_mtx);
+               }
                goto nextpkt;
        }
 
@@ -1724,11 +1828,15 @@ somove(struct socket *so, int wait)
        }
 
        /* Send window update to source peer as receive buffer has changed. */
-       if (so->so_proto->pr_flags & PR_WANTRCVD)
+       if (so->so_proto->pr_flags & PR_WANTRCVD) {
+               mtx_leave(&sosp->so_snd.sb_mtx);
+               mtx_leave(&so->so_rcv.sb_mtx);
                pru_rcvd(so);
+               mtx_enter(&so->so_rcv.sb_mtx);
+               mtx_enter(&sosp->so_snd.sb_mtx);
+       }
 
        /* Receive buffer did shrink by len bytes, adjust oob. */
-       mtx_enter(&so->so_rcv.sb_mtx);
        rcvstate = so->so_rcv.sb_state;
        so->so_rcv.sb_state &= ~SS_RCVATMARK;
        oobmark = so->so_oobmark;
@@ -1739,7 +1847,6 @@ somove(struct socket *so, int wait)
                if (oobmark >= len)
                        oobmark = 0;
        }
-       mtx_leave(&so->so_rcv.sb_mtx);
 
        /*
         * Handle oob data.  If any malloc fails, ignore error.
@@ -1755,7 +1862,12 @@ somove(struct socket *so, int wait)
                } else if (oobmark) {
                        o = m_split(m, oobmark, wait);
                        if (o) {
+                               mtx_leave(&sosp->so_snd.sb_mtx);
+                               mtx_leave(&so->so_rcv.sb_mtx);
                                error = pru_send(sosp, m, NULL, NULL);
+                               mtx_enter(&so->so_rcv.sb_mtx);
+                               mtx_enter(&sosp->so_snd.sb_mtx);
+
                                if (error) {
                                        if (sosp->so_snd.sb_state &
                                            SS_CANTSENDMORE)
@@ -1773,7 +1885,13 @@ somove(struct socket *so, int wait)
                if (o) {
                        o->m_len = 1;
                        *mtod(o, caddr_t) = *mtod(m, caddr_t);
+
+                       mtx_leave(&sosp->so_snd.sb_mtx);
+                       mtx_leave(&so->so_rcv.sb_mtx);
                        error = pru_sendoob(sosp, o, NULL, NULL);
+                       mtx_enter(&so->so_rcv.sb_mtx);
+                       mtx_enter(&sosp->so_snd.sb_mtx);
+
                        if (error) {
                                if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
                                        error = EPIPE;
@@ -1791,15 +1909,25 @@ somove(struct socket *so, int wait)
                }
        }
 
-       mtx_enter(&sosp->so_snd.sb_mtx);
        /* Append all remaining data to drain socket. */
        if (so->so_rcv.sb_cc == 0 || maxreached)
                sosp->so_snd.sb_state &= ~SS_ISSENDING;
+
        mtx_leave(&sosp->so_snd.sb_mtx);
+       mtx_leave(&so->so_rcv.sb_mtx);
 
+       if (sockdgram)
+               solock_shared(sosp);
        error = pru_send(sosp, m, NULL, NULL);
+       if (sockdgram)
+               sounlock_shared(sosp);
+
+       mtx_enter(&so->so_rcv.sb_mtx);
+       mtx_enter(&sosp->so_snd.sb_mtx);
+
        if (error) {
-               if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
+               if (sosp->so_snd.sb_state & SS_CANTSENDMORE ||
+                   sosp->so_pcb == NULL)
                        error = EPIPE;
                goto release;
        }
@@ -1810,26 +1938,35 @@ somove(struct socket *so, int wait)
                goto nextpkt;
 
  release:
-       mtx_enter(&sosp->so_snd.sb_mtx);
        sosp->so_snd.sb_state &= ~SS_ISSENDING;
-       mtx_leave(&sosp->so_snd.sb_mtx);
 
        if (!error && maxreached && so->so_splicemax == so->so_splicelen)
                error = EFBIG;
        if (error)
-               so->so_error = error;
+               WRITE_ONCE(so->so_error, error);
+
        if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
            so->so_rcv.sb_cc == 0) ||
            (sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
-           maxreached || error) {
+           maxreached || error)
+               unsplice = 1;
+
+       mtx_leave(&sosp->so_snd.sb_mtx);
+       mtx_leave(&so->so_rcv.sb_mtx);
+
+       if (unsplice) {
+               if (sockdgram)
+                       solock(so);
                sounsplice(so, sosp, 0);
+               if (sockdgram)
+                       sounlock(so);
+
                return (0);
        }
        if (timerisset(&so->so_idletv))
                timeout_add_tv(&so->so_idleto, &so->so_idletv);
        return (1);
 }
-
 #endif /* SOCKET_SPLICE */
 
 void
@@ -1839,22 +1976,16 @@ sorwakeup(struct socket *so)
                soassertlocked_readonly(so);
 
 #ifdef SOCKET_SPLICE
-       if (so->so_rcv.sb_flags & SB_SPLICE) {
-               /*
-                * TCP has a sendbuffer that can handle multiple packets
-                * at once.  So queue the stream a bit to accumulate data.
-                * The sosplice thread will call somove() later and send
-                * the packets calling tcp_output() only once.
-                * In the UDP case, send out the packets immediately.
-                * Using a thread would make things slower.
-                */
-               if (so->so_proto->pr_flags & PR_WANTRCVD)
+       if (so->so_proto->pr_flags & PR_SPLICE) {
+               sb_mtx_lock(&so->so_rcv);
+               if (so->so_rcv.sb_flags & SB_SPLICE)
                        task_add(sosplice_taskq, &so->so_splicetask);
-               else
-                       somove(so, M_DONTWAIT);
+               if (isspliced(so)) {
+                       sb_mtx_unlock(&so->so_rcv);
+                       return;
+               }
+               sb_mtx_unlock(&so->so_rcv);
        }
-       if (isspliced(so))
-               return;
 #endif
        sowakeup(so, &so->so_rcv);
        if (so->so_upcall)
@@ -1868,10 +1999,17 @@ sowwakeup(struct socket *so)
                soassertlocked_readonly(so);
 
 #ifdef SOCKET_SPLICE
-       if (so->so_snd.sb_flags & SB_SPLICE)
-               task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
-       if (issplicedback(so))
-               return;
+       if (so->so_proto->pr_flags & PR_SPLICE) {
+               sb_mtx_lock(&so->so_snd);
+               if (so->so_snd.sb_flags & SB_SPLICE)
+                       task_add(sosplice_taskq,
+                           &so->so_sp->ssp_soback->so_splicetask);
+               if (issplicedback(so)) {
+                       sb_mtx_unlock(&so->so_snd);
+                       return;
+               }
+               sb_mtx_unlock(&so->so_snd);
+       }
 #endif
        sowakeup(so, &so->so_snd);
 }
index 0e4002a..15a048e 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: udp_usrreq.c,v 1.322 2024/07/19 15:41:58 bluhm Exp $  */
+/*     $OpenBSD: udp_usrreq.c,v 1.323 2024/07/20 17:26:19 mvs Exp $    */
 /*     $NetBSD: udp_usrreq.c,v 1.28 1996/03/16 23:54:03 christos Exp $ */
 
 /*
@@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf *m, struct mbuf *addr,
 
        soassertlocked_readonly(so);
 
+       if (inp == NULL) {
+               /* PCB could be destroyed, but socket still spliced. */
+               return (EINVAL);
+       }
+
 #ifdef PIPEX
        if (inp->inp_pipex) {
                struct pipex_session *session;
index d116c07..c2a36ad 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: socketvar.h,v 1.132 2024/07/12 17:20:18 mvs Exp $     */
+/*     $OpenBSD: socketvar.h,v 1.133 2024/07/20 17:26:19 mvs Exp $     */
 /*     $NetBSD: socketvar.h,v 1.18 1996/02/09 18:25:38 christos Exp $  */
 
 /*-
@@ -51,6 +51,33 @@ typedef      __socklen_t     socklen_t;      /* length type for network syscalls */
 
 TAILQ_HEAD(soqhead, socket);
 
+/*
+ * Locks used to protect global data and struct members:
+ *     I       immutable after creation
+ *     mr      sb_mxt of so_rcv buffer
+ *     ms      sb_mtx of so_snd buffer
+ *     br      sblock() of so_rcv buffer
+ *     bs      sblock() od so_snd buffer
+ *     s       solock()
+ */
+
+/*
+ * XXXSMP: tcp(4) sockets rely on exclusive solock() for all the cases.
+ */
+
+/*
+ * Variables for socket splicing, allocated only when needed.
+ */
+struct sosplice {
+       struct  socket *ssp_socket;     /* [mr ms] send data to drain socket */
+       struct  socket *ssp_soback;     /* [ms ms] back ref to source socket */
+       off_t   ssp_len;                /* [mr] number of bytes spliced */
+       off_t   ssp_max;                /* [I] maximum number of bytes */
+       struct  timeval ssp_idletv;     /* [I] idle timeout */
+       struct  timeout ssp_idleto;
+       struct  task ssp_task;          /* task for somove */
+};
+
 /*
  * Kernel structure per socket.
  * Contains send and receive buffer queues,
@@ -89,18 +116,8 @@ struct socket {
        short   so_timeo;               /* connection timeout */
        u_long  so_oobmark;             /* chars to oob mark */
        u_int   so_error;               /* error affecting connection */
-/*
- * Variables for socket splicing, allocated only when needed.
- */
-       struct sosplice {
-               struct  socket *ssp_socket;     /* send data to drain socket */
-               struct  socket *ssp_soback;     /* back ref to source socket */
-               off_t   ssp_len;                /* number of bytes spliced */
-               off_t   ssp_max;                /* maximum number of bytes */
-               struct  timeval ssp_idletv;     /* idle timeout */
-               struct  timeout ssp_idleto;
-               struct  task ssp_task;          /* task for somove */
-       } *so_sp;
+
+       struct sosplice *so_sp;         /* [s br] */
 /*
  * Variables for socket buffering.
  */
@@ -330,6 +347,12 @@ int sblock(struct sockbuf *, int);
 /* release lock on sockbuf sb */
 void sbunlock(struct sockbuf *);
 
+static inline void
+sbassertlocked(struct sockbuf *sb)
+{
+       rw_assert_wrlock(&sb->sb_lock);
+}
+
 #define        SB_EMPTY_FIXUP(sb) do {                                         \
        if ((sb)->sb_mb == NULL) {                                      \
                (sb)->sb_mbtail = NULL;                                 \