Define the msgbuf queues globably. Clean up the code since rsyncq and procq
authorclaudio <claudio@openbsd.org>
Thu, 4 Feb 2021 13:38:27 +0000 (13:38 +0000)
committerclaudio <claudio@openbsd.org>
Thu, 4 Feb 2021 13:38:27 +0000 (13:38 +0000)
no longer need to be passed all the way down anymore. Shuffle code a bit
to bring it into more order.
OK tb@

usr.sbin/rpki-client/main.c

index cb67b09..3445289 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: main.c,v 1.95 2021/02/04 09:57:37 claudio Exp $ */
+/*     $OpenBSD: main.c,v 1.96 2021/02/04 13:38:27 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
  *
@@ -114,10 +114,9 @@ filepathcmp(struct filepath *a, struct filepath *b)
 
 RB_HEAD(filepath_tree, filepath);
 RB_PROTOTYPE(filepath_tree, filepath, entry, filepathcmp);
-struct filepath_tree  fpt = RB_INITIALIZER(&fpt);
 
-static void    entityq_flush(struct msgbuf *, struct entityq *,
-                   const struct repo *);
+static struct filepath_tree    fpt = RB_INITIALIZER(&fpt);
+static struct msgbuf           procq, rsyncq;
 
 const char     *bird_tablename = "ROAS";
 
@@ -204,34 +203,23 @@ entity_read_req(int fd, struct entity *ent)
 }
 
 /*
- * Like entity_write_req() but into a buffer.
+ * Write the queue entity.
  * Matched by entity_read_req().
  */
 static void
-entity_buffer_req(struct ibuf *b, const struct entity *ent)
+entity_write_req(const struct entity *ent)
 {
+       struct ibuf *b;
 
+       if ((b = ibuf_dynamic(sizeof(*ent), UINT_MAX)) == NULL)
+               err(1, NULL);
        io_simple_buffer(b, &ent->type, sizeof(ent->type));
        io_str_buffer(b, ent->uri);
        io_simple_buffer(b, &ent->has_pkey, sizeof(int));
        if (ent->has_pkey)
                io_buf_buffer(b, ent->pkey, ent->pkeysz);
        io_str_buffer(b, ent->descr);
-}
-
-/*
- * Write the queue entity.
- * Simply a wrapper around entity_buffer_req().
- */
-static void
-entity_write_req(struct msgbuf *msgq, const struct entity *ent)
-{
-       struct ibuf *b;
-
-       if ((b = ibuf_dynamic(sizeof(*ent), UINT_MAX)) == NULL)
-               err(1, NULL);
-       entity_buffer_req(b, ent);
-       ibuf_close(msgq, b);
+       ibuf_close(&procq, b);
 }
 
 /*
@@ -239,24 +227,67 @@ entity_write_req(struct msgbuf *msgq, const struct entity *ent)
  * repo, then flush those into the parser process.
  */
 static void
-entityq_flush(struct msgbuf *msgq, struct entityq *q, const struct repo *repo)
+entityq_flush(struct entityq *q, const struct repo *repo)
 {
        struct entity   *p, *np;
 
        TAILQ_FOREACH_SAFE(p, q, entries, np) {
                if (p->repo < 0 || repo->id != (size_t)p->repo)
                        continue;
-               entity_write_req(msgq, p);
+               entity_write_req(p);
                TAILQ_REMOVE(q, p, entries);
                entity_free(p);
        }
 }
 
+/*
+ * Add the heap-allocated file to the queue for processing.
+ */
+static void
+entityq_add(struct entityq *q, char *file, enum rtype type,
+    const struct repo *rp, const unsigned char *pkey, size_t pkeysz,
+    char *descr)
+{
+       struct entity   *p;
+
+       if ((p = calloc(1, sizeof(struct entity))) == NULL)
+               err(1, "calloc");
+
+       p->type = type;
+       p->uri = file;
+       p->repo = (rp != NULL) ? (ssize_t)rp->id : -1;
+       p->has_pkey = pkey != NULL;
+       if (p->has_pkey) {
+               p->pkeysz = pkeysz;
+               if ((p->pkey = malloc(pkeysz)) == NULL)
+                       err(1, "malloc");
+               memcpy(p->pkey, pkey, pkeysz);
+       }
+       if (descr != NULL)
+               if ((p->descr = strdup(descr)) == NULL)
+                       err(1, "strdup");
+
+       filepath_add(file);
+
+       entity_queue++;
+
+       /*
+        * Write to the queue if there's no repo or the repo has already
+        * been loaded else enqueue it for later.
+        */
+
+       if (rp == NULL || rp->loaded) {
+               entity_write_req(p);
+               entity_free(p);
+       } else
+               TAILQ_INSERT_TAIL(q, p, entries);
+}
+
 /*
  * Look up a repository, queueing it for discovery if not found.
  */
 static const struct repo *
-repo_lookup(struct msgbuf *msgq, const char *uri)
+repo_lookup(const char *uri)
 {
        const char      *host, *mod;
        size_t           hostsz, modsz, i;
@@ -303,7 +334,7 @@ repo_lookup(struct msgbuf *msgq, const char *uri)
                io_simple_buffer(b, &rp->id, sizeof(rp->id));
                io_str_buffer(b, local);
                io_str_buffer(b, rp->repo);
-               ibuf_close(msgq, b);
+               ibuf_close(&rsyncq, b);
                free(local);
        } else {
                rp->loaded = 1;
@@ -328,55 +359,12 @@ repo_filename(const struct repo *repo, const char *uri)
        return nfile;
 }
 
-/*
- * Add the heap-allocated file to the queue for processing.
- */
-static void
-entityq_add(struct msgbuf *msgq, struct entityq *q, char *file, enum rtype type,
-    const struct repo *rp, const unsigned char *pkey, size_t pkeysz,
-    char *descr)
-{
-       struct entity   *p;
-
-       if ((p = calloc(1, sizeof(struct entity))) == NULL)
-               err(1, "calloc");
-
-       p->type = type;
-       p->uri = file;
-       p->repo = (rp != NULL) ? (ssize_t)rp->id : -1;
-       p->has_pkey = pkey != NULL;
-       if (p->has_pkey) {
-               p->pkeysz = pkeysz;
-               if ((p->pkey = malloc(pkeysz)) == NULL)
-                       err(1, "malloc");
-               memcpy(p->pkey, pkey, pkeysz);
-       }
-       if (descr != NULL)
-               if ((p->descr = strdup(descr)) == NULL)
-                       err(1, "strdup");
-
-       filepath_add(file);
-
-       entity_queue++;
-
-       /*
-        * Write to the queue if there's no repo or the repo has already
-        * been loaded else enqueue it for later.
-        */
-
-       if (rp == NULL || rp->loaded) {
-               entity_write_req(msgq, p);
-               entity_free(p);
-       } else
-               TAILQ_INSERT_TAIL(q, p, entries);
-}
-
 /*
  * Add a file (CER, ROA, CRL) from an MFT file, RFC 6486.
  * These are always relative to the directory in which "mft" sits.
  */
 static void
-queue_add_from_mft(struct msgbuf *msgq, struct entityq *q, const char *mft,
+queue_add_from_mft(struct entityq *q, const char *mft,
     const struct mftfile *file, enum rtype type)
 {
        char            *cp, *nfile;
@@ -393,7 +381,7 @@ queue_add_from_mft(struct msgbuf *msgq, struct entityq *q, const char *mft,
         * that the repository has already been loaded.
         */
 
-       entityq_add(msgq, q, nfile, type, NULL, NULL, 0, NULL);
+       entityq_add(q, nfile, type, NULL, NULL, 0, NULL);
 }
 
 /*
@@ -405,8 +393,7 @@ queue_add_from_mft(struct msgbuf *msgq, struct entityq *q, const char *mft,
  * check the suffix anyway).
  */
 static void
-queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q,
-    const struct mft *mft)
+queue_add_from_mft_set(struct entityq *q, const struct mft *mft)
 {
        size_t                   i, sz;
        const struct mftfile    *f;
@@ -417,7 +404,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q,
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".crl"))
                        continue;
-               queue_add_from_mft(msgq, q, mft->file, f, RTYPE_CRL);
+               queue_add_from_mft(q, mft->file, f, RTYPE_CRL);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -426,7 +413,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q,
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".cer"))
                        continue;
-               queue_add_from_mft(msgq, q, mft->file, f, RTYPE_CER);
+               queue_add_from_mft(q, mft->file, f, RTYPE_CER);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -435,7 +422,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q,
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".roa"))
                        continue;
-               queue_add_from_mft(msgq, q, mft->file, f, RTYPE_ROA);
+               queue_add_from_mft(q, mft->file, f, RTYPE_ROA);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -444,7 +431,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q,
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".gbr"))
                        continue;
-               queue_add_from_mft(msgq, q, mft->file, f, RTYPE_GBR);
+               queue_add_from_mft(q, mft->file, f, RTYPE_GBR);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -464,7 +451,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q,
  * Add a local TAL file (RFC 7730) to the queue of files to fetch.
  */
 static void
-queue_add_tal(struct msgbuf *msgq, struct entityq *q, const char *file)
+queue_add_tal(struct entityq *q, const char *file)
 {
        char    *nfile, *buf;
 
@@ -484,7 +471,7 @@ queue_add_tal(struct msgbuf *msgq, struct entityq *q, const char *file)
        }
 
        /* Not in a repository, so directly add to queue. */
-       entityq_add(msgq, q, nfile, RTYPE_TAL, NULL, NULL, 0, buf);
+       entityq_add(q, nfile, RTYPE_TAL, NULL, NULL, 0, buf);
        /* entityq_add makes a copy of buf */
        free(buf);
 }
@@ -493,8 +480,7 @@ queue_add_tal(struct msgbuf *msgq, struct entityq *q, const char *file)
  * Add URIs (CER) from a TAL file, RFC 8630.
  */
 static void
-queue_add_from_tal(struct msgbuf *procq, struct msgbuf *rsyncq,
-    struct entityq *q, const struct tal *tal)
+queue_add_from_tal(struct entityq *q, const struct tal *tal)
 {
        char                    *nfile;
        const struct repo       *repo;
@@ -512,10 +498,10 @@ queue_add_from_tal(struct msgbuf *procq, struct msgbuf *rsyncq,
                errx(1, "TAL file has no rsync:// URI");
 
        /* Look up the repository. */
-       repo = repo_lookup(rsyncq, uri);
+       repo = repo_lookup(uri);
        nfile = repo_filename(repo, uri);
 
-       entityq_add(procq, q, nfile, RTYPE_CER, repo, tal->pkey,
+       entityq_add(q, nfile, RTYPE_CER, repo, tal->pkey,
            tal->pkeysz, tal->descr);
 }
 
@@ -523,16 +509,15 @@ queue_add_from_tal(struct msgbuf *procq, struct msgbuf *rsyncq,
  * Add a manifest (MFT) found in an X509 certificate, RFC 6487.
  */
 static void
-queue_add_from_cert(struct msgbuf *procq, struct msgbuf *rsyncq,
-    struct entityq *q, const struct cert *cert)
+queue_add_from_cert(struct entityq *q, const struct cert *cert)
 {
        const struct repo       *repo;
        char                    *nfile;
 
-       repo = repo_lookup(rsyncq, cert->mft);
+       repo = repo_lookup(cert->mft);
        nfile = repo_filename(repo, cert->mft);
 
-       entityq_add(procq, q, nfile, RTYPE_MFT, repo, NULL, 0, NULL);
+       entityq_add(q, nfile, RTYPE_MFT, repo, NULL, 0, NULL);
 }
 
 /*
@@ -542,8 +527,8 @@ queue_add_from_cert(struct msgbuf *procq, struct msgbuf *rsyncq,
  * In all cases, we gather statistics.
  */
 static void
-entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq,
-    struct stats *st, struct entityq *q, struct vrp_tree *tree)
+entity_process(int proc, struct stats *st, struct entityq *q,
+    struct vrp_tree *tree)
 {
        enum rtype      type;
        struct tal      *tal;
@@ -564,7 +549,7 @@ entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq,
        case RTYPE_TAL:
                st->tals++;
                tal = tal_read(proc);
-               queue_add_from_tal(procq, rsyncq, q, tal);
+               queue_add_from_tal(q, tal);
                tal_free(tal);
                break;
        case RTYPE_CER:
@@ -582,8 +567,7 @@ entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq,
                         * we're revoked and then we don't want to
                         * process the MFT.
                         */
-                       queue_add_from_cert(procq, rsyncq,
-                           q, cert);
+                       queue_add_from_cert(q, cert);
                } else
                        st->certs_invalid++;
                cert_free(cert);
@@ -598,7 +582,7 @@ entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq,
                mft = mft_read(proc);
                if (mft->stale)
                        st->mfts_stale++;
-               queue_add_from_mft_set(procq, q, mft);
+               queue_add_from_mft_set(q, mft);
                mft_free(mft);
                break;
        case RTYPE_CRL:
@@ -758,7 +742,6 @@ main(int argc, char *argv[])
        pid_t            procpid, rsyncpid;
        int              fd[2];
        struct entityq   q;
-       struct msgbuf    procq, rsyncq;
        struct pollfd    pfd[2];
        struct roa      **out = NULL;
        char            *rsync_prog = "openrsync";
@@ -959,7 +942,7 @@ main(int argc, char *argv[])
         */
 
        for (i = 0; i < talsz; i++)
-               queue_add_tal(&procq, &q, tals[i]);
+               queue_add_tal(&q, tals[i]);
 
        while (entity_queue > 0 && !killme) {
                pfd[0].events = POLLIN;
@@ -1035,7 +1018,7 @@ main(int argc, char *argv[])
                                logx("%s: load from network failed, "
                                    "fallback to cache", rt.repos[i].local);
                        stats.repos++;
-                       entityq_flush(&procq, &q, &rt.repos[i]);
+                       entityq_flush(&q, &rt.repos[i]);
                }
 
                /*
@@ -1044,7 +1027,7 @@ main(int argc, char *argv[])
                 */
 
                if ((pfd[1].revents & POLLIN)) {
-                       entity_process(proc, &procq, &rsyncq, &stats, &q, &v);
+                       entity_process(proc, &stats, &q, &v);
                }
        }