From: claudio Date: Fri, 2 Sep 2022 13:04:16 +0000 (+0000) Subject: Rework the rsync proc code. Use a proper queue of requests and enforce X-Git-Url: http://artulab.com/gitweb/?a=commitdiff_plain;h=3e9f5857a4682097f0d54547c9b3ce5d65b4925a;p=openbsd Rework the rsync proc code. Use a proper queue of requests and enforce the limit on that queue instead of stopping to read new messages. This is needed to implement an abort request. "There is not enough RB_TREE in this diff" tb@ --- diff --git a/usr.sbin/rpki-client/rsync.c b/usr.sbin/rpki-client/rsync.c index 6fc689d3f13..5c66349ee05 100644 --- a/usr.sbin/rpki-client/rsync.c +++ b/usr.sbin/rpki-client/rsync.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rsync.c,v 1.41 2022/08/09 09:02:26 claudio Exp $ */ +/* $OpenBSD: rsync.c,v 1.42 2022/09/02 13:04:16 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -41,12 +41,17 @@ * We can have multiple of these simultaneously and need to keep track * of which process maps to which request. */ -struct rsyncproc { - char *uri; /* uri of this rsync proc */ - unsigned int id; /* identity of request */ - pid_t pid; /* pid of process or 0 if unassociated */ +struct rsync { + TAILQ_ENTRY(rsync) entry; + char *uri; /* uri of this rsync proc */ + char *dst; /* destination directory */ + char *compdst; /* compare against directory */ + unsigned int id; /* identity of request */ + pid_t pid; /* pid of process or 0 if unassociated */ }; +static TAILQ_HEAD(, rsync) states = TAILQ_HEAD_INITIALIZER(states); + /* * Return the base of a rsync URI (rsync://hostname/module). The * caRepository provided by the RIR CAs point deeper than they should @@ -124,6 +129,83 @@ rsync_fixup_dest(char *destdir, char *compdir) return fn; } +static pid_t +exec_rsync(const char *prog, const char *bind_addr, char *uri, char *dst, + char *compdst) +{ + pid_t pid; + char *args[32]; + char *reldst; + int i; + + + if ((pid = fork()) == -1) + err(1, "fork"); + + if (pid == 0) { + if (pledge("stdio exec", NULL) == -1) + err(1, "pledge"); + i = 0; + args[i++] = (char *)prog; + args[i++] = "-rt"; + args[i++] = "--no-motd"; + args[i++] = "--max-size=" STRINGIFY(MAX_FILE_SIZE); + args[i++] = "--contimeout=" STRINGIFY(MAX_CONN_TIMEOUT); + args[i++] = "--timeout=" STRINGIFY(MAX_IO_TIMEOUT); + args[i++] = "--include=*/"; + args[i++] = "--include=*.cer"; + args[i++] = "--include=*.crl"; + args[i++] = "--include=*.gbr"; + args[i++] = "--include=*.mft"; + args[i++] = "--include=*.roa"; + args[i++] = "--include=*.asa"; + args[i++] = "--exclude=*"; + if (bind_addr != NULL) { + args[i++] = "--address"; + args[i++] = (char *)bind_addr; + } + if (compdst != NULL && + (reldst = rsync_fixup_dest(dst, compdst)) != NULL) { + args[i++] = "--compare-dest"; + args[i++] = reldst; + } + args[i++] = uri; + args[i++] = dst; + args[i] = NULL; + /* XXX args overflow not prevented */ + execvp(args[0], args); + err(1, "%s: execvp", prog); + } + + return pid; +} + +static void +rsync_new(unsigned int id, char *uri, char *dst, char *compdst) +{ + struct rsync *s; + + if ((s = calloc(1, sizeof(*s))) == NULL) + err(1, NULL); + + s->id = id; + s->uri = uri; + s->dst = dst; + s->compdst = compdst; + + TAILQ_INSERT_TAIL(&states, s, entry); +} + +static void +rsync_free(struct rsync *s) +{ + TAILQ_REMOVE(&states, s, entry); + free(s->uri); + free(s->dst); + free(s->compdst); + free(s); +} + static void proc_child(int signal) { @@ -141,13 +223,12 @@ proc_child(int signal) void proc_rsync(char *prog, char *bind_addr, int fd) { - size_t i, nprocs = 0; - int rc = 0; + int nprocs = 0, npending = 0, rc = 0; struct pollfd pfd; struct msgbuf msgq; struct ibuf *b, *inbuf = NULL; sigset_t mask, oldmask; - struct rsyncproc ids[MAX_RSYNC_REQUESTS] = { 0 }; + struct rsync *s, *ns; if (pledge("stdio rpath proc exec unveil", NULL) == -1) err(1, "pledge"); @@ -211,11 +292,23 @@ proc_rsync(char *prog, char *bind_addr, int fd) int st; pfd.events = 0; - if (nprocs < MAX_RSYNC_REQUESTS) - pfd.events |= POLLIN; + pfd.events |= POLLIN; if (msgq.queued) pfd.events |= POLLOUT; + if (npending > 0 && nprocs < MAX_RSYNC_REQUESTS) { + TAILQ_FOREACH(s, &states, entry) { + if (s->pid == 0) { + s->pid = exec_rsync(prog, bind_addr, + s->uri, s->dst, s->compdst); + if (++nprocs >= MAX_RSYNC_REQUESTS) + break; + if (--npending == 0) + break; + } + } + } + if (ppoll(&pfd, 1, NULL, &oldmask) == -1) { if (errno != EINTR) err(1, "ppoll"); @@ -230,36 +323,33 @@ proc_rsync(char *prog, char *bind_addr, int fd) while ((pid = waitpid(WAIT_ANY, &st, WNOHANG)) > 0) { int ok = 1; - for (i = 0; i < MAX_RSYNC_REQUESTS; i++) - if (ids[i].pid == pid) + TAILQ_FOREACH(s, &states, entry) + if (s->pid == pid) break; - if (i >= MAX_RSYNC_REQUESTS) + if (s == NULL) errx(1, "waitpid: %d unexpected", pid); if (!WIFEXITED(st)) { warnx("rsync %s terminated abnormally", - ids[i].uri); + s->uri); rc = 1; ok = 0; } else if (WEXITSTATUS(st) != 0) { - warnx("rsync %s failed", ids[i].uri); + warnx("rsync %s failed", s->uri); ok = 0; } b = io_new_buffer(); - io_simple_buffer(b, &ids[i].id, - sizeof(ids[i].id)); + io_simple_buffer(b, &s->id, sizeof(s->id)); io_simple_buffer(b, &ok, sizeof(ok)); io_close_buffer(&msgq, b); - free(ids[i].uri); - ids[i].uri = NULL; - ids[i].pid = 0; - ids[i].id = 0; + rsync_free(s); nprocs--; } if (pid == -1 && errno != ECHILD) err(1, "waitpid"); + continue; } @@ -278,8 +368,6 @@ proc_rsync(char *prog, char *bind_addr, int fd) if (!(pfd.revents & POLLIN)) continue; - if (nprocs >= MAX_RSYNC_REQUESTS) - continue; b = io_buf_read(fd, &inbuf); if (b == NULL) @@ -293,75 +381,28 @@ proc_rsync(char *prog, char *bind_addr, int fd) ibuf_free(b); - assert(dst); - assert(uri); - - /* Run process itself, wait for exit, check error. */ - - if ((pid = fork()) == -1) - err(1, "fork"); - - if (pid == 0) { - char *args[32]; - char *reldst; - - if (pledge("stdio exec", NULL) == -1) - err(1, "pledge"); - i = 0; - args[i++] = (char *)prog; - args[i++] = "-rt"; - args[i++] = "--no-motd"; - args[i++] = "--max-size=" STRINGIFY(MAX_FILE_SIZE); - args[i++] = "--contimeout=" STRINGIFY(MAX_CONN_TIMEOUT); - args[i++] = "--timeout=" STRINGIFY(MAX_IO_TIMEOUT); - args[i++] = "--include=*/"; - args[i++] = "--include=*.cer"; - args[i++] = "--include=*.crl"; - args[i++] = "--include=*.gbr"; - args[i++] = "--include=*.mft"; - args[i++] = "--include=*.roa"; - args[i++] = "--include=*.asa"; - args[i++] = "--exclude=*"; - if (bind_addr != NULL) { - args[i++] = "--address"; - args[i++] = (char *)bind_addr; - } - if (compdst != NULL && - (reldst = rsync_fixup_dest(dst, compdst)) != NULL) { - args[i++] = "--compare-dest"; - args[i++] = reldst; + if (dst != NULL) { + rsync_new(id, uri, dst, compdst); + npending++; + } else { + TAILQ_FOREACH(s, &states, entry) + if (s->id == id) + break; + if (s != NULL) { + if (s->pid != 0) + kill(s->pid, SIGTERM); + else + rsync_free(s); } - args[i++] = uri; - args[i++] = dst; - args[i] = NULL; - /* XXX args overflow not prevented */ - execvp(args[0], args); - err(1, "%s: execvp", prog); } - - /* Augment the list of running processes. */ - - for (i = 0; i < MAX_RSYNC_REQUESTS; i++) - if (ids[i].pid == 0) - break; - assert(i < MAX_RSYNC_REQUESTS); - ids[i].id = id; - ids[i].pid = pid; - ids[i].uri = uri; - nprocs++; - - /* Clean up temporary values. */ - - free(dst); - free(compdst); } /* No need for these to be hanging around. */ - for (i = 0; i < MAX_RSYNC_REQUESTS; i++) - if (ids[i].pid != 0) { - kill(ids[i].pid, SIGTERM); - free(ids[i].uri); - } + TAILQ_FOREACH_SAFE(s, &states, entry, ns) { + if (s->pid != 0) + kill(s->pid, SIGTERM); + rsync_free(s); + } msgbuf_clear(&msgq); exit(rc);