From 5ff9ae983dfef1db28301100475d0f3cdc55996e Mon Sep 17 00:00:00 2001 From: claudio Date: Thu, 4 Feb 2021 13:38:27 +0000 Subject: [PATCH] Define the msgbuf queues globably. Clean up the code since rsyncq and procq no longer need to be passed all the way down anymore. Shuffle code a bit to bring it into more order. OK tb@ --- usr.sbin/rpki-client/main.c | 175 ++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 96 deletions(-) diff --git a/usr.sbin/rpki-client/main.c b/usr.sbin/rpki-client/main.c index cb67b0951a2..344528995fe 100644 --- a/usr.sbin/rpki-client/main.c +++ b/usr.sbin/rpki-client/main.c @@ -1,4 +1,4 @@ -/* $OpenBSD: main.c,v 1.95 2021/02/04 09:57:37 claudio Exp $ */ +/* $OpenBSD: main.c,v 1.96 2021/02/04 13:38:27 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -114,10 +114,9 @@ filepathcmp(struct filepath *a, struct filepath *b) RB_HEAD(filepath_tree, filepath); RB_PROTOTYPE(filepath_tree, filepath, entry, filepathcmp); -struct filepath_tree fpt = RB_INITIALIZER(&fpt); -static void entityq_flush(struct msgbuf *, struct entityq *, - const struct repo *); +static struct filepath_tree fpt = RB_INITIALIZER(&fpt); +static struct msgbuf procq, rsyncq; const char *bird_tablename = "ROAS"; @@ -204,34 +203,23 @@ entity_read_req(int fd, struct entity *ent) } /* - * Like entity_write_req() but into a buffer. + * Write the queue entity. * Matched by entity_read_req(). */ static void -entity_buffer_req(struct ibuf *b, const struct entity *ent) +entity_write_req(const struct entity *ent) { + struct ibuf *b; + if ((b = ibuf_dynamic(sizeof(*ent), UINT_MAX)) == NULL) + err(1, NULL); io_simple_buffer(b, &ent->type, sizeof(ent->type)); io_str_buffer(b, ent->uri); io_simple_buffer(b, &ent->has_pkey, sizeof(int)); if (ent->has_pkey) io_buf_buffer(b, ent->pkey, ent->pkeysz); io_str_buffer(b, ent->descr); -} - -/* - * Write the queue entity. - * Simply a wrapper around entity_buffer_req(). - */ -static void -entity_write_req(struct msgbuf *msgq, const struct entity *ent) -{ - struct ibuf *b; - - if ((b = ibuf_dynamic(sizeof(*ent), UINT_MAX)) == NULL) - err(1, NULL); - entity_buffer_req(b, ent); - ibuf_close(msgq, b); + ibuf_close(&procq, b); } /* @@ -239,24 +227,67 @@ entity_write_req(struct msgbuf *msgq, const struct entity *ent) * repo, then flush those into the parser process. */ static void -entityq_flush(struct msgbuf *msgq, struct entityq *q, const struct repo *repo) +entityq_flush(struct entityq *q, const struct repo *repo) { struct entity *p, *np; TAILQ_FOREACH_SAFE(p, q, entries, np) { if (p->repo < 0 || repo->id != (size_t)p->repo) continue; - entity_write_req(msgq, p); + entity_write_req(p); TAILQ_REMOVE(q, p, entries); entity_free(p); } } +/* + * Add the heap-allocated file to the queue for processing. + */ +static void +entityq_add(struct entityq *q, char *file, enum rtype type, + const struct repo *rp, const unsigned char *pkey, size_t pkeysz, + char *descr) +{ + struct entity *p; + + if ((p = calloc(1, sizeof(struct entity))) == NULL) + err(1, "calloc"); + + p->type = type; + p->uri = file; + p->repo = (rp != NULL) ? (ssize_t)rp->id : -1; + p->has_pkey = pkey != NULL; + if (p->has_pkey) { + p->pkeysz = pkeysz; + if ((p->pkey = malloc(pkeysz)) == NULL) + err(1, "malloc"); + memcpy(p->pkey, pkey, pkeysz); + } + if (descr != NULL) + if ((p->descr = strdup(descr)) == NULL) + err(1, "strdup"); + + filepath_add(file); + + entity_queue++; + + /* + * Write to the queue if there's no repo or the repo has already + * been loaded else enqueue it for later. + */ + + if (rp == NULL || rp->loaded) { + entity_write_req(p); + entity_free(p); + } else + TAILQ_INSERT_TAIL(q, p, entries); +} + /* * Look up a repository, queueing it for discovery if not found. */ static const struct repo * -repo_lookup(struct msgbuf *msgq, const char *uri) +repo_lookup(const char *uri) { const char *host, *mod; size_t hostsz, modsz, i; @@ -303,7 +334,7 @@ repo_lookup(struct msgbuf *msgq, const char *uri) io_simple_buffer(b, &rp->id, sizeof(rp->id)); io_str_buffer(b, local); io_str_buffer(b, rp->repo); - ibuf_close(msgq, b); + ibuf_close(&rsyncq, b); free(local); } else { rp->loaded = 1; @@ -328,55 +359,12 @@ repo_filename(const struct repo *repo, const char *uri) return nfile; } -/* - * Add the heap-allocated file to the queue for processing. - */ -static void -entityq_add(struct msgbuf *msgq, struct entityq *q, char *file, enum rtype type, - const struct repo *rp, const unsigned char *pkey, size_t pkeysz, - char *descr) -{ - struct entity *p; - - if ((p = calloc(1, sizeof(struct entity))) == NULL) - err(1, "calloc"); - - p->type = type; - p->uri = file; - p->repo = (rp != NULL) ? (ssize_t)rp->id : -1; - p->has_pkey = pkey != NULL; - if (p->has_pkey) { - p->pkeysz = pkeysz; - if ((p->pkey = malloc(pkeysz)) == NULL) - err(1, "malloc"); - memcpy(p->pkey, pkey, pkeysz); - } - if (descr != NULL) - if ((p->descr = strdup(descr)) == NULL) - err(1, "strdup"); - - filepath_add(file); - - entity_queue++; - - /* - * Write to the queue if there's no repo or the repo has already - * been loaded else enqueue it for later. - */ - - if (rp == NULL || rp->loaded) { - entity_write_req(msgq, p); - entity_free(p); - } else - TAILQ_INSERT_TAIL(q, p, entries); -} - /* * Add a file (CER, ROA, CRL) from an MFT file, RFC 6486. * These are always relative to the directory in which "mft" sits. */ static void -queue_add_from_mft(struct msgbuf *msgq, struct entityq *q, const char *mft, +queue_add_from_mft(struct entityq *q, const char *mft, const struct mftfile *file, enum rtype type) { char *cp, *nfile; @@ -393,7 +381,7 @@ queue_add_from_mft(struct msgbuf *msgq, struct entityq *q, const char *mft, * that the repository has already been loaded. */ - entityq_add(msgq, q, nfile, type, NULL, NULL, 0, NULL); + entityq_add(q, nfile, type, NULL, NULL, 0, NULL); } /* @@ -405,8 +393,7 @@ queue_add_from_mft(struct msgbuf *msgq, struct entityq *q, const char *mft, * check the suffix anyway). */ static void -queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q, - const struct mft *mft) +queue_add_from_mft_set(struct entityq *q, const struct mft *mft) { size_t i, sz; const struct mftfile *f; @@ -417,7 +404,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q, assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".crl")) continue; - queue_add_from_mft(msgq, q, mft->file, f, RTYPE_CRL); + queue_add_from_mft(q, mft->file, f, RTYPE_CRL); } for (i = 0; i < mft->filesz; i++) { @@ -426,7 +413,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q, assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".cer")) continue; - queue_add_from_mft(msgq, q, mft->file, f, RTYPE_CER); + queue_add_from_mft(q, mft->file, f, RTYPE_CER); } for (i = 0; i < mft->filesz; i++) { @@ -435,7 +422,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q, assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".roa")) continue; - queue_add_from_mft(msgq, q, mft->file, f, RTYPE_ROA); + queue_add_from_mft(q, mft->file, f, RTYPE_ROA); } for (i = 0; i < mft->filesz; i++) { @@ -444,7 +431,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q, assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".gbr")) continue; - queue_add_from_mft(msgq, q, mft->file, f, RTYPE_GBR); + queue_add_from_mft(q, mft->file, f, RTYPE_GBR); } for (i = 0; i < mft->filesz; i++) { @@ -464,7 +451,7 @@ queue_add_from_mft_set(struct msgbuf *msgq, struct entityq *q, * Add a local TAL file (RFC 7730) to the queue of files to fetch. */ static void -queue_add_tal(struct msgbuf *msgq, struct entityq *q, const char *file) +queue_add_tal(struct entityq *q, const char *file) { char *nfile, *buf; @@ -484,7 +471,7 @@ queue_add_tal(struct msgbuf *msgq, struct entityq *q, const char *file) } /* Not in a repository, so directly add to queue. */ - entityq_add(msgq, q, nfile, RTYPE_TAL, NULL, NULL, 0, buf); + entityq_add(q, nfile, RTYPE_TAL, NULL, NULL, 0, buf); /* entityq_add makes a copy of buf */ free(buf); } @@ -493,8 +480,7 @@ queue_add_tal(struct msgbuf *msgq, struct entityq *q, const char *file) * Add URIs (CER) from a TAL file, RFC 8630. */ static void -queue_add_from_tal(struct msgbuf *procq, struct msgbuf *rsyncq, - struct entityq *q, const struct tal *tal) +queue_add_from_tal(struct entityq *q, const struct tal *tal) { char *nfile; const struct repo *repo; @@ -512,10 +498,10 @@ queue_add_from_tal(struct msgbuf *procq, struct msgbuf *rsyncq, errx(1, "TAL file has no rsync:// URI"); /* Look up the repository. */ - repo = repo_lookup(rsyncq, uri); + repo = repo_lookup(uri); nfile = repo_filename(repo, uri); - entityq_add(procq, q, nfile, RTYPE_CER, repo, tal->pkey, + entityq_add(q, nfile, RTYPE_CER, repo, tal->pkey, tal->pkeysz, tal->descr); } @@ -523,16 +509,15 @@ queue_add_from_tal(struct msgbuf *procq, struct msgbuf *rsyncq, * Add a manifest (MFT) found in an X509 certificate, RFC 6487. */ static void -queue_add_from_cert(struct msgbuf *procq, struct msgbuf *rsyncq, - struct entityq *q, const struct cert *cert) +queue_add_from_cert(struct entityq *q, const struct cert *cert) { const struct repo *repo; char *nfile; - repo = repo_lookup(rsyncq, cert->mft); + repo = repo_lookup(cert->mft); nfile = repo_filename(repo, cert->mft); - entityq_add(procq, q, nfile, RTYPE_MFT, repo, NULL, 0, NULL); + entityq_add(q, nfile, RTYPE_MFT, repo, NULL, 0, NULL); } /* @@ -542,8 +527,8 @@ queue_add_from_cert(struct msgbuf *procq, struct msgbuf *rsyncq, * In all cases, we gather statistics. */ static void -entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq, - struct stats *st, struct entityq *q, struct vrp_tree *tree) +entity_process(int proc, struct stats *st, struct entityq *q, + struct vrp_tree *tree) { enum rtype type; struct tal *tal; @@ -564,7 +549,7 @@ entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq, case RTYPE_TAL: st->tals++; tal = tal_read(proc); - queue_add_from_tal(procq, rsyncq, q, tal); + queue_add_from_tal(q, tal); tal_free(tal); break; case RTYPE_CER: @@ -582,8 +567,7 @@ entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq, * we're revoked and then we don't want to * process the MFT. */ - queue_add_from_cert(procq, rsyncq, - q, cert); + queue_add_from_cert(q, cert); } else st->certs_invalid++; cert_free(cert); @@ -598,7 +582,7 @@ entity_process(int proc, struct msgbuf *procq, struct msgbuf *rsyncq, mft = mft_read(proc); if (mft->stale) st->mfts_stale++; - queue_add_from_mft_set(procq, q, mft); + queue_add_from_mft_set(q, mft); mft_free(mft); break; case RTYPE_CRL: @@ -758,7 +742,6 @@ main(int argc, char *argv[]) pid_t procpid, rsyncpid; int fd[2]; struct entityq q; - struct msgbuf procq, rsyncq; struct pollfd pfd[2]; struct roa **out = NULL; char *rsync_prog = "openrsync"; @@ -959,7 +942,7 @@ main(int argc, char *argv[]) */ for (i = 0; i < talsz; i++) - queue_add_tal(&procq, &q, tals[i]); + queue_add_tal(&q, tals[i]); while (entity_queue > 0 && !killme) { pfd[0].events = POLLIN; @@ -1035,7 +1018,7 @@ main(int argc, char *argv[]) logx("%s: load from network failed, " "fallback to cache", rt.repos[i].local); stats.repos++; - entityq_flush(&procq, &q, &rt.repos[i]); + entityq_flush(&q, &rt.repos[i]); } /* @@ -1044,7 +1027,7 @@ main(int argc, char *argv[]) */ if ((pfd[1].revents & POLLIN)) { - entity_process(proc, &procq, &rsyncq, &stats, &q, &v); + entity_process(proc, &stats, &q, &v); } } -- 2.20.1