From bafdc581e0ec3dfa25b342131d5b90c356057e63 Mon Sep 17 00:00:00 2001 From: mvs Date: Fri, 19 Nov 2021 17:07:10 +0000 Subject: [PATCH] Add and enable new 'unsendrecvthr' test which performs multithreaded writes on unix(4) sockets. The receiver should be sure no data corruption or loss. ok bluhm@ --- regress/sys/kern/Makefile | 5 +- regress/sys/kern/unsendrecvthr/Makefile | 22 ++ .../sys/kern/unsendrecvthr/unsendrecvthr.c | 230 ++++++++++++++++++ 3 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 regress/sys/kern/unsendrecvthr/Makefile create mode 100644 regress/sys/kern/unsendrecvthr/unsendrecvthr.c diff --git a/regress/sys/kern/Makefile b/regress/sys/kern/Makefile index 573ddbf4608..e75259ce3cd 100644 --- a/regress/sys/kern/Makefile +++ b/regress/sys/kern/Makefile @@ -1,4 +1,4 @@ -# $OpenBSD: Makefile,v 1.91 2021/11/13 08:25:42 anton Exp $ +# $OpenBSD: Makefile,v 1.92 2021/11/19 17:07:10 mvs Exp $ SUBDIR+= __syscall SUBDIR+= accept access @@ -22,7 +22,8 @@ SUBDIR+= setuid .endif SUBDIR+= signal sosplice stackjmp stackpivot syscall syscall_segment SUBDIR+= sysvmsg sysvsem sysvshm -SUBDIR+= unalign unfdpass unfdpassfail ungc unixsock unveil unveil-unmount +SUBDIR+= unalign unfdpass unfdpassfail ungc unsendrecvthr unixsock unveil +SUBDIR+= unveil-unmount SUBDIR+= wait install: diff --git a/regress/sys/kern/unsendrecvthr/Makefile b/regress/sys/kern/unsendrecvthr/Makefile new file mode 100644 index 00000000000..9a808c50cd6 --- /dev/null +++ b/regress/sys/kern/unsendrecvthr/Makefile @@ -0,0 +1,22 @@ +# $OpenBSD: Makefile,v 1.1 2021/11/19 17:07:10 mvs Exp $ + +# Copyright (c) 2021 Makkoveev Vitaliy +# +# Permission to use, copy, modify, and distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +WARNINGS = yes + +PROG = unsendrecvthr +LDADD+= -lpthread + +.include diff --git a/regress/sys/kern/unsendrecvthr/unsendrecvthr.c b/regress/sys/kern/unsendrecvthr/unsendrecvthr.c new file mode 100644 index 00000000000..45e81c051c6 --- /dev/null +++ b/regress/sys/kern/unsendrecvthr/unsendrecvthr.c @@ -0,0 +1,230 @@ +/* $OpenBSD: unsendrecvthr.c,v 1.1 2021/11/19 17:07:10 mvs Exp $ */ + +/* + * Copyright (c) 2021 Vitaliy Makkoveev + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +/* + * Create the pair of SOCK_SEQPACKET sockets and perform #count_of_cpus + * simultaneous writes on each of them. In half of transmissions the + * sockets will be re-locked in kernel space. Be sure no data corruption + * or loss. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static pthread_mutex_t therr_mtx = PTHREAD_MUTEX_INITIALIZER; + +static void +therr(int eval, const char *fmt, ...) +{ + va_list ap; + + pthread_mutex_lock(&therr_mtx); + + va_start(ap, fmt); + verr(eval, fmt, ap); + va_end(ap); +} + +static void +therrx(int eval, const char *fmt, ...) +{ + va_list ap; + + pthread_mutex_lock(&therr_mtx); + + va_start(ap, fmt); + verrx(eval, fmt, ap); + va_end(ap); +} + +static void +therrc(int eval, int code, const char *fmt, ...) +{ + va_list ap; + + pthread_mutex_lock(&therr_mtx); + + va_start(ap, fmt); + verrc(eval, code, fmt, ap); + va_end(ap); +} + +struct data { + int id; + unsigned int cnt; +}; + +struct thr_tx_arg { + int s; + int id; +}; + +struct rx_data { + unsigned int cnt; +}; + +struct thr_rx_arg { + int s; + int rx_data_num; + struct rx_data *rx_data; +}; + +static void * +thr_tx(void *arg) +{ + struct data data; + int s = ((struct thr_tx_arg *)arg)->s; + + data.id = ((struct thr_tx_arg *)arg)->id; + data.cnt = 1; + + while (1) { + ssize_t ret; + + if ((ret = send(s, &data, sizeof(data), 0)) < 0) + therr(1, "send"); + if (ret != sizeof(data)) + therrx(1, "send: wrong data size"); + + data.cnt++; + } + + return NULL; +} + +static void * +thr_rx(void *arg) +{ + int s = ((struct thr_rx_arg *)arg)->s; + int rx_data_num = ((struct thr_rx_arg *)arg)->rx_data_num; + struct rx_data *rx_data = ((struct thr_rx_arg *)arg)->rx_data; + + while (1) { + struct data data; + ssize_t ret; + + if ((ret = recv(s, &data, sizeof(data), 0)) < 0) + therr(1, "recv"); + if (ret != sizeof(data)) + therrx(1, "recv: wrong data size"); + + if (data.id >= rx_data_num) + therrx(1, "recv: wrong id"); + + if (data.cnt != (unsigned int)(rx_data[data.id].cnt + 1)) { + therrx(1, "recv: data loss %d -> %d", + rx_data[data.id].cnt, data.cnt); + } + rx_data[data.id].cnt = data.cnt; + } + + return NULL; +} + +int +main(int argc, char *argv[]) +{ + struct timeval testtime = { + .tv_sec = 60, + .tv_usec = 0, + }; + struct timeval *tv = &testtime; + + int mib[2], ncpu; + size_t len; + + struct rx_data *rx_data[2]; + struct thr_rx_arg rx_args[2]; + struct thr_tx_arg *tx_args[2]; + + int s[2], i, j; + + if (argc == 2 && !strcmp(argv[1], "--infinite")) + tv = NULL; + + mib[0] = CTL_HW; + mib[1] = HW_NCPUONLINE; + len = sizeof(ncpu); + + if (sysctl(mib, 2, &ncpu, &len, NULL, 0) < 0) + err(1, "sysctl"); + if (ncpu <= 0) + errx(1, "Wrong number of CPUs online: %d", ncpu); + + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, s) < 0) + err(1, "socketpair"); + + for (i = 0; i < 2; ++i) { + if (!(rx_data[i] = calloc(ncpu, sizeof(*rx_data)))) + err(1, "calloc"); + + for (j = 0; j < ncpu; ++j) + rx_data[i][j].cnt = 0; + } + + for (i = 0; i < 2; ++i) { + rx_args[i].s = s[i]; + rx_args[i].rx_data_num = ncpu; + rx_args[i].rx_data = rx_data[i]; + } + + for (i = 0; i < 2; ++i) { + if (!(tx_args[i] = calloc(ncpu, sizeof(*tx_args)))) + err(1, "calloc"); + + for (j = 0; j < ncpu; ++j) { + tx_args[i][j].s = s[i]; + tx_args[i][j].id = j; + } + } + + for (i = 0; i < 2; ++i) { + pthread_t thr; + int error; + + error = pthread_create(&thr, NULL, thr_rx, &rx_args[i]); + if (error) + therrc(1, error, "pthread_create"); + } + + for (i = 0; i < 2; ++i) { + pthread_t thr; + int error; + + for (j = 0; j < ncpu; ++j) { + error = pthread_create(&thr, NULL, + thr_tx, &tx_args[i][j]); + if (error) + therrc(1, error, "pthread_create"); + } + } + + select(0, NULL, NULL, NULL, tv); + + return 0; +} -- 2.20.1