Rework the rsync proc code. Use a proper queue of requests and enforce
authorclaudio <claudio@openbsd.org>
Fri, 2 Sep 2022 13:04:16 +0000 (13:04 +0000)
committerclaudio <claudio@openbsd.org>
Fri, 2 Sep 2022 13:04:16 +0000 (13:04 +0000)
the limit on that queue instead of stopping to read new messages.
This is needed to implement an abort request.
"There is not enough RB_TREE in this diff" tb@

usr.sbin/rpki-client/rsync.c

index 6fc689d..5c66349 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: rsync.c,v 1.41 2022/08/09 09:02:26 claudio Exp $ */
+/*     $OpenBSD: rsync.c,v 1.42 2022/09/02 13:04:16 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
  *
  * We can have multiple of these simultaneously and need to keep track
  * of which process maps to which request.
  */
-struct rsyncproc {
-       char            *uri; /* uri of this rsync proc */
-       unsigned int     id; /* identity of request */
-       pid_t            pid; /* pid of process or 0 if unassociated */
+struct rsync {
+       TAILQ_ENTRY(rsync)       entry;
+       char                    *uri; /* uri of this rsync proc */
+       char                    *dst; /* destination directory */
+       char                    *compdst; /* compare against directory */
+       unsigned int             id; /* identity of request */
+       pid_t                    pid; /* pid of process or 0 if unassociated */
 };
 
+static TAILQ_HEAD(, rsync)     states = TAILQ_HEAD_INITIALIZER(states);
+
 /*
  * Return the base of a rsync URI (rsync://hostname/module). The
  * caRepository provided by the RIR CAs point deeper than they should
@@ -124,6 +129,83 @@ rsync_fixup_dest(char *destdir, char *compdir)
        return fn;
 }
 
+static pid_t
+exec_rsync(const char *prog, const char *bind_addr, char *uri, char *dst,
+    char *compdst)
+{
+       pid_t pid;
+       char *args[32];
+       char *reldst;
+       int i;
+
+
+       if ((pid = fork()) == -1)
+               err(1, "fork");
+
+       if (pid == 0) {
+               if (pledge("stdio exec", NULL) == -1)
+                       err(1, "pledge");
+               i = 0;
+               args[i++] = (char *)prog;
+               args[i++] = "-rt";
+               args[i++] = "--no-motd";
+               args[i++] = "--max-size=" STRINGIFY(MAX_FILE_SIZE);
+               args[i++] = "--contimeout=" STRINGIFY(MAX_CONN_TIMEOUT);
+               args[i++] = "--timeout=" STRINGIFY(MAX_IO_TIMEOUT);
+               args[i++] = "--include=*/";
+               args[i++] = "--include=*.cer";
+               args[i++] = "--include=*.crl";
+               args[i++] = "--include=*.gbr";
+               args[i++] = "--include=*.mft";
+               args[i++] = "--include=*.roa";
+               args[i++] = "--include=*.asa";
+               args[i++] = "--exclude=*";
+               if (bind_addr != NULL) {
+                       args[i++] = "--address";
+                       args[i++] = (char *)bind_addr;
+               }
+               if (compdst != NULL &&
+                   (reldst = rsync_fixup_dest(dst, compdst)) != NULL) {
+                       args[i++] = "--compare-dest";
+                       args[i++] = reldst;
+               }
+               args[i++] = uri;
+               args[i++] = dst;
+               args[i] = NULL;
+               /* XXX args overflow not prevented */
+               execvp(args[0], args);
+               err(1, "%s: execvp", prog);
+       }
+
+       return pid;
+}
+
+static void
+rsync_new(unsigned int id, char *uri, char *dst, char *compdst)
+{
+       struct rsync *s;
+
+       if ((s = calloc(1, sizeof(*s))) == NULL)
+               err(1, NULL);
+
+       s->id = id;
+       s->uri = uri;
+       s->dst = dst;
+       s->compdst = compdst;
+
+       TAILQ_INSERT_TAIL(&states, s, entry);
+}
+
+static void
+rsync_free(struct rsync *s)
+{
+       TAILQ_REMOVE(&states, s, entry);
+       free(s->uri);
+       free(s->dst);
+       free(s->compdst);
+       free(s);
+}
+
 static void
 proc_child(int signal)
 {
@@ -141,13 +223,12 @@ proc_child(int signal)
 void
 proc_rsync(char *prog, char *bind_addr, int fd)
 {
-       size_t                   i, nprocs = 0;
-       int                      rc = 0;
+       int                      nprocs = 0, npending = 0, rc = 0;
        struct pollfd            pfd;
        struct msgbuf            msgq;
        struct ibuf             *b, *inbuf = NULL;
        sigset_t                 mask, oldmask;
-       struct rsyncproc         ids[MAX_RSYNC_REQUESTS] = { 0 };
+       struct rsync            *s, *ns;
 
        if (pledge("stdio rpath proc exec unveil", NULL) == -1)
                err(1, "pledge");
@@ -211,11 +292,23 @@ proc_rsync(char *prog, char *bind_addr, int fd)
                int st;
 
                pfd.events = 0;
-               if (nprocs < MAX_RSYNC_REQUESTS)
-                       pfd.events |= POLLIN;
+               pfd.events |= POLLIN;
                if (msgq.queued)
                        pfd.events |= POLLOUT;
 
+               if (npending > 0 && nprocs < MAX_RSYNC_REQUESTS) {
+                       TAILQ_FOREACH(s, &states, entry) {
+                               if (s->pid == 0) {
+                                       s->pid = exec_rsync(prog, bind_addr,
+                                           s->uri, s->dst, s->compdst);
+                                       if (++nprocs >= MAX_RSYNC_REQUESTS)
+                                               break;
+                                       if (--npending == 0)
+                                               break;
+                               }
+                       }
+               }
+
                if (ppoll(&pfd, 1, NULL, &oldmask) == -1) {
                        if (errno != EINTR)
                                err(1, "ppoll");
@@ -230,36 +323,33 @@ proc_rsync(char *prog, char *bind_addr, int fd)
                        while ((pid = waitpid(WAIT_ANY, &st, WNOHANG)) > 0) {
                                int ok = 1;
 
-                               for (i = 0; i < MAX_RSYNC_REQUESTS; i++)
-                                       if (ids[i].pid == pid)
+                               TAILQ_FOREACH(s, &states, entry)
+                                       if (s->pid == pid)
                                                break;
-                               if (i >= MAX_RSYNC_REQUESTS)
+                               if (s == NULL)
                                        errx(1, "waitpid: %d unexpected", pid);
 
                                if (!WIFEXITED(st)) {
                                        warnx("rsync %s terminated abnormally",
-                                           ids[i].uri);
+                                           s->uri);
                                        rc = 1;
                                        ok = 0;
                                } else if (WEXITSTATUS(st) != 0) {
-                                       warnx("rsync %s failed", ids[i].uri);
+                                       warnx("rsync %s failed", s->uri);
                                        ok = 0;
                                }
 
                                b = io_new_buffer();
-                               io_simple_buffer(b, &ids[i].id,
-                                   sizeof(ids[i].id));
+                               io_simple_buffer(b, &s->id, sizeof(s->id));
                                io_simple_buffer(b, &ok, sizeof(ok));
                                io_close_buffer(&msgq, b);
 
-                               free(ids[i].uri);
-                               ids[i].uri = NULL;
-                               ids[i].pid = 0;
-                               ids[i].id = 0;
+                               rsync_free(s);
                                nprocs--;
                        }
                        if (pid == -1 && errno != ECHILD)
                                err(1, "waitpid");
+
                        continue;
                }
 
@@ -278,8 +368,6 @@ proc_rsync(char *prog, char *bind_addr, int fd)
 
                if (!(pfd.revents & POLLIN))
                        continue;
-               if (nprocs >= MAX_RSYNC_REQUESTS)
-                       continue;
 
                b = io_buf_read(fd, &inbuf);
                if (b == NULL)
@@ -293,75 +381,28 @@ proc_rsync(char *prog, char *bind_addr, int fd)
 
                ibuf_free(b);
 
-               assert(dst);
-               assert(uri);
-
-               /* Run process itself, wait for exit, check error. */
-
-               if ((pid = fork()) == -1)
-                       err(1, "fork");
-
-               if (pid == 0) {
-                       char *args[32];
-                       char *reldst;
-
-                       if (pledge("stdio exec", NULL) == -1)
-                               err(1, "pledge");
-                       i = 0;
-                       args[i++] = (char *)prog;
-                       args[i++] = "-rt";
-                       args[i++] = "--no-motd";
-                       args[i++] = "--max-size=" STRINGIFY(MAX_FILE_SIZE);
-                       args[i++] = "--contimeout=" STRINGIFY(MAX_CONN_TIMEOUT);
-                       args[i++] = "--timeout=" STRINGIFY(MAX_IO_TIMEOUT);
-                       args[i++] = "--include=*/";
-                       args[i++] = "--include=*.cer";
-                       args[i++] = "--include=*.crl";
-                       args[i++] = "--include=*.gbr";
-                       args[i++] = "--include=*.mft";
-                       args[i++] = "--include=*.roa";
-                       args[i++] = "--include=*.asa";
-                       args[i++] = "--exclude=*";
-                       if (bind_addr != NULL) {
-                               args[i++] = "--address";
-                               args[i++] = (char *)bind_addr;
-                       }
-                       if (compdst != NULL &&
-                           (reldst = rsync_fixup_dest(dst, compdst)) != NULL) {
-                               args[i++] = "--compare-dest";
-                               args[i++] = reldst;
+               if (dst != NULL) {
+                       rsync_new(id, uri, dst, compdst);
+                       npending++;
+               } else {
+                       TAILQ_FOREACH(s, &states, entry)
+                               if (s->id == id)
+                                       break;
+                       if (s != NULL) {
+                               if (s->pid != 0)
+                                       kill(s->pid, SIGTERM);
+                               else
+                                       rsync_free(s);
                        }
-                       args[i++] = uri;
-                       args[i++] = dst;
-                       args[i] = NULL;
-                       /* XXX args overflow not prevented */
-                       execvp(args[0], args);
-                       err(1, "%s: execvp", prog);
                }
-
-               /* Augment the list of running processes. */
-
-               for (i = 0; i < MAX_RSYNC_REQUESTS; i++)
-                       if (ids[i].pid == 0)
-                               break;
-               assert(i < MAX_RSYNC_REQUESTS);
-               ids[i].id = id;
-               ids[i].pid = pid;
-               ids[i].uri = uri;
-               nprocs++;
-
-               /* Clean up temporary values. */
-
-               free(dst);
-               free(compdst);
        }
 
        /* No need for these to be hanging around. */
-       for (i = 0; i < MAX_RSYNC_REQUESTS; i++)
-               if (ids[i].pid != 0) {
-                       kill(ids[i].pid, SIGTERM);
-                       free(ids[i].uri);
-               }
+       TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
+               if (s->pid != 0)
+                       kill(s->pid, SIGTERM);
+               rsync_free(s);
+       }
 
        msgbuf_clear(&msgq);
        exit(rc);