From 2defcb5299830b400a0efca8c11e567f222723dd Mon Sep 17 00:00:00 2001 From: claudio Date: Fri, 22 Oct 2021 11:13:06 +0000 Subject: [PATCH] First step of cleanup in the io land. Introduce io_buf_new() and io_buf_close(). These function will inject a size of the the buffer at the beginning of the buffer and will allow the read size to be switched to proper async IO. OK benno@ --- usr.sbin/rpki-client/extern.h | 5 +++- usr.sbin/rpki-client/http.c | 17 ++++++------ usr.sbin/rpki-client/io.c | 32 +++++++++++++++++++++-- usr.sbin/rpki-client/main.c | 49 +++++++++++++++++------------------ usr.sbin/rpki-client/parser.c | 9 +++---- usr.sbin/rpki-client/rrdp.c | 27 +++++++++---------- usr.sbin/rpki-client/rsync.c | 28 +++++++------------- 7 files changed, 92 insertions(+), 75 deletions(-) diff --git a/usr.sbin/rpki-client/extern.h b/usr.sbin/rpki-client/extern.h index a740981feec..1923756216d 100644 --- a/usr.sbin/rpki-client/extern.h +++ b/usr.sbin/rpki-client/extern.h @@ -1,4 +1,4 @@ -/* $OpenBSD: extern.h,v 1.72 2021/10/12 15:16:45 job Exp $ */ +/* $OpenBSD: extern.h,v 1.73 2021/10/22 11:13:06 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -388,6 +388,7 @@ struct stats { }; struct ibuf; +struct msgbuf; /* global variables */ extern int verbose; @@ -536,9 +537,11 @@ char *hex_encode(const unsigned char *, size_t); void io_socket_blocking(int); void io_socket_nonblocking(int); +struct ibuf *io_buf_new(void); void io_simple_buffer(struct ibuf *, const void *, size_t); void io_buf_buffer(struct ibuf *, const void *, size_t); void io_str_buffer(struct ibuf *, const char *); +void io_buf_close(struct msgbuf *, struct ibuf *); void io_simple_read(int, void *, size_t); void io_buf_read_alloc(int, void **, size_t *); void io_str_read(int, char **); diff --git a/usr.sbin/rpki-client/http.c b/usr.sbin/rpki-client/http.c index 67c17f19869..ffd56a16d4f 100644 --- a/usr.sbin/rpki-client/http.c +++ b/usr.sbin/rpki-client/http.c @@ -1,4 +1,4 @@ -/* $OpenBSD: http.c,v 1.42 2021/10/05 07:22:21 claudio Exp $ */ +/* $OpenBSD: http.c,v 1.43 2021/10/22 11:13:06 claudio Exp $ */ /* * Copyright (c) 2020 Nils Fisher * Copyright (c) 2020 Claudio Jeker @@ -569,12 +569,11 @@ http_req_done(size_t id, enum http_result res, const char *last_modified) { struct ibuf *b; - if ((b = ibuf_dynamic(64, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &id, sizeof(id)); io_simple_buffer(b, &res, sizeof(res)); io_str_buffer(b, last_modified); - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); } /* @@ -586,12 +585,11 @@ http_req_fail(size_t id) struct ibuf *b; enum http_result res = HTTP_FAILED; - if ((b = ibuf_dynamic(8, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &id, sizeof(id)); io_simple_buffer(b, &res, sizeof(res)); io_str_buffer(b, NULL); - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); } /* @@ -1861,12 +1859,13 @@ proc_http(char *bind_addr, int fd) } } if (pfds[0].revents & POLLIN) { - size_t id; + size_t id, size; int outfd; char *uri; char *mod; - outfd = io_recvfd(fd, &id, sizeof(id)); + outfd = io_recvfd(fd, &size, sizeof(size)); + io_simple_read(fd, &id, sizeof(id)); io_str_read(fd, &uri); io_str_read(fd, &mod); diff --git a/usr.sbin/rpki-client/io.c b/usr.sbin/rpki-client/io.c index 507a2d6b77b..e22423df22d 100644 --- a/usr.sbin/rpki-client/io.c +++ b/usr.sbin/rpki-client/io.c @@ -1,4 +1,4 @@ -/* $OpenBSD: io.c,v 1.13 2021/03/04 13:01:41 claudio Exp $ */ +/* $OpenBSD: io.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -53,7 +53,22 @@ io_socket_nonblocking(int fd) } /* - * Like io_simple_write() but into a buffer. + * Create new io buffer, call io_close() when done with it. + * Function always returns a new buffer. + */ +struct ibuf * +io_buf_new(void) +{ + struct ibuf *b; + + if ((b = ibuf_dynamic(64, INT32_MAX)) == NULL) + err(1, NULL); + ibuf_reserve(b, sizeof(size_t)); /* can not fail */ + return b; +} + +/* + * Add a simple object of static size to the io buffer. */ void io_simple_buffer(struct ibuf *b, const void *res, size_t sz) @@ -86,6 +101,19 @@ io_str_buffer(struct ibuf *b, const char *p) io_buf_buffer(b, p, sz); } +/* + * Finish and enqueue a io buffer. + */ +void +io_buf_close(struct msgbuf *msgbuf, struct ibuf *b) +{ + size_t len; + + len = ibuf_size(b); + memcpy(ibuf_seek(b, 0, sizeof(len)), &len, sizeof(len)); + ibuf_close(msgbuf, b); +} + /* * Read of a binary buffer that must be on a blocking descriptor. * Does nothing if "sz" is zero. diff --git a/usr.sbin/rpki-client/main.c b/usr.sbin/rpki-client/main.c index bac50257ae9..558d5837950 100644 --- a/usr.sbin/rpki-client/main.c +++ b/usr.sbin/rpki-client/main.c @@ -1,4 +1,4 @@ -/* $OpenBSD: main.c,v 1.149 2021/10/11 16:50:03 job Exp $ */ +/* $OpenBSD: main.c,v 1.150 2021/10/22 11:13:06 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -84,7 +84,6 @@ logx(const char *fmt, ...) void entity_free(struct entity *ent) { - if (ent == NULL) return; @@ -102,7 +101,9 @@ entity_free(struct entity *ent) void entity_read_req(int fd, struct entity *ent) { + size_t size; + io_simple_read(fd, &size, sizeof(size)); io_simple_read(fd, &ent->type, sizeof(enum rtype)); io_str_read(fd, &ent->file); io_simple_read(fd, &ent->has_pkey, sizeof(int)); @@ -126,15 +127,14 @@ entity_write_req(const struct entity *ent) return; } - if ((b = ibuf_dynamic(sizeof(*ent), UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &ent->type, sizeof(ent->type)); io_str_buffer(b, ent->file); io_simple_buffer(b, &ent->has_pkey, sizeof(int)); if (ent->has_pkey) io_buf_buffer(b, ent->pkey, ent->pkeysz); io_str_buffer(b, ent->descr); - ibuf_close(&procq, b); + io_buf_close(&procq, b); } /* @@ -224,12 +224,11 @@ rrdp_file_resp(size_t id, int ok) enum rrdp_msg type = RRDP_FILE; struct ibuf *b; - if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &id, sizeof(id)); io_simple_buffer(b, &ok, sizeof(ok)); - ibuf_close(&rrdpq, b); + io_buf_close(&rrdpq, b); } void @@ -239,8 +238,7 @@ rrdp_fetch(size_t id, const char *uri, const char *local, enum rrdp_msg type = RRDP_START; struct ibuf *b; - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &id, sizeof(id)); io_str_buffer(b, local); @@ -248,7 +246,7 @@ rrdp_fetch(size_t id, const char *uri, const char *local, io_str_buffer(b, s->session_id); io_simple_buffer(b, &s->serial, sizeof(s->serial)); io_str_buffer(b, s->last_mod); - ibuf_close(&rrdpq, b); + io_buf_close(&rrdpq, b); } /* @@ -259,12 +257,11 @@ rsync_fetch(size_t id, const char *uri, const char *local) { struct ibuf *b; - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &id, sizeof(id)); io_str_buffer(b, local); io_str_buffer(b, uri); - ibuf_close(&rsyncq, b); + io_buf_close(&rsyncq, b); } /* @@ -275,14 +272,13 @@ http_fetch(size_t id, const char *uri, const char *last_mod, int fd) { struct ibuf *b; - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &id, sizeof(id)); io_str_buffer(b, uri); io_str_buffer(b, last_mod); /* pass file as fd */ b->fd = fd; - ibuf_close(&httpq, b); + io_buf_close(&httpq, b); } /* @@ -299,12 +295,11 @@ rrdp_http_fetch(size_t id, const char *uri, const char *last_mod) if (pipe2(pi, O_CLOEXEC | O_NONBLOCK) == -1) err(1, "pipe"); - if ((b = ibuf_open(sizeof(type) + sizeof(id))) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &id, sizeof(id)); b->fd = pi[0]; - ibuf_close(&rrdpq, b); + io_buf_close(&rrdpq, b); http_fetch(id, uri, last_mod, pi[1]); } @@ -316,13 +311,12 @@ rrdp_http_done(size_t id, enum http_result res, const char *last_mod) struct ibuf *b; /* RRDP request, relay response over to the rrdp process */ - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &id, sizeof(id)); io_simple_buffer(b, &res, sizeof(res)); io_str_buffer(b, last_mod); - ibuf_close(&rrdpq, b); + io_buf_close(&rrdpq, b); } /* @@ -473,6 +467,7 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, struct cert *cert; struct mft *mft; struct roa *roa; + size_t size; int c; /* @@ -481,6 +476,7 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, * certificate, for example). * We follow that up with whether the resources didn't parse. */ + io_simple_read(proc, &size, sizeof(size)); io_simple_read(proc, &type, sizeof(type)); switch (type) { @@ -553,7 +549,7 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, st->gbrs++; break; default: - errx(1, "unknown entity type"); + errx(1, "unknown entity type %d", type); } entity_queue--; @@ -629,7 +625,7 @@ main(int argc, char *argv[]) { int rc, c, st, proc, rsync, http, rrdp, ok, hangup = 0, fl = SOCK_STREAM | SOCK_CLOEXEC; - size_t i, id, talsz = 0; + size_t i, id, talsz = 0, size; pid_t pid, procpid, rsyncpid, httppid, rrdppid; int fd[2]; struct pollfd pfd[NPFD]; @@ -1004,6 +1000,7 @@ main(int argc, char *argv[]) */ if ((pfd[1].revents & POLLIN)) { + io_simple_read(rsync, &size, sizeof(size)); io_simple_read(rsync, &id, sizeof(id)); io_simple_read(rsync, &ok, sizeof(ok)); rsync_finish(id, ok); @@ -1013,6 +1010,7 @@ main(int argc, char *argv[]) enum http_result res; char *last_mod; + io_simple_read(http, &size, sizeof(size)); io_simple_read(http, &id, sizeof(id)); io_simple_read(http, &res, sizeof(res)); io_str_read(http, &last_mod); @@ -1031,6 +1029,7 @@ main(int argc, char *argv[]) char hash[SHA256_DIGEST_LENGTH]; size_t dsz; + io_simple_read(rrdp, &size, sizeof(size)); io_simple_read(rrdp, &type, sizeof(type)); io_simple_read(rrdp, &id, sizeof(id)); diff --git a/usr.sbin/rpki-client/parser.c b/usr.sbin/rpki-client/parser.c index cecec5f6c7c..796879094a1 100644 --- a/usr.sbin/rpki-client/parser.c +++ b/usr.sbin/rpki-client/parser.c @@ -1,4 +1,4 @@ -/* $OpenBSD: parser.c,v 1.13 2021/10/11 16:50:03 job Exp $ */ +/* $OpenBSD: parser.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */ /* * Copyright (c) 2019 Claudio Jeker * Copyright (c) 2019 Kristaps Dzonsons @@ -523,13 +523,13 @@ proc_parser(int fd) struct roa *roa; struct entity *entp; struct entityq q; - int c, rc = 1; struct msgbuf msgq; struct pollfd pfd; struct ibuf *b; X509_STORE_CTX *ctx; struct auth_tree auths = RB_INITIALIZER(&auths); struct crl_tree crlt = RB_INITIALIZER(&crlt); + int c, rc = 1; ERR_load_crypto_strings(); OpenSSL_add_all_ciphers(); @@ -602,8 +602,7 @@ proc_parser(int fd) entp = TAILQ_FIRST(&q); assert(entp != NULL); - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &entp->type, sizeof(entp->type)); switch (entp->type) { @@ -656,7 +655,7 @@ proc_parser(int fd) abort(); } - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); TAILQ_REMOVE(&q, entp, entries); entity_free(entp); } diff --git a/usr.sbin/rpki-client/rrdp.c b/usr.sbin/rpki-client/rrdp.c index d2fb0633738..e1bb138fc84 100644 --- a/usr.sbin/rpki-client/rrdp.c +++ b/usr.sbin/rpki-client/rrdp.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rrdp.c,v 1.11 2021/08/31 15:18:53 claudio Exp $ */ +/* $OpenBSD: rrdp.c,v 1.12 2021/10/22 11:13:06 claudio Exp $ */ /* * Copyright (c) 2020 Nils Fisher * Copyright (c) 2021 Claudio Jeker @@ -140,12 +140,11 @@ rrdp_done(size_t id, int ok) enum rrdp_msg type = RRDP_END; struct ibuf *b; - if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &id, sizeof(id)); io_simple_buffer(b, &ok, sizeof(ok)); - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); } /* @@ -162,13 +161,12 @@ rrdp_http_req(size_t id, const char *uri, const char *last_mod) enum rrdp_msg type = RRDP_HTTP_REQ; struct ibuf *b; - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &id, sizeof(id)); io_str_buffer(b, uri); io_str_buffer(b, last_mod); - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); } /* @@ -180,14 +178,13 @@ rrdp_state_send(struct rrdp *s) enum rrdp_msg type = RRDP_SESSION; struct ibuf *b; - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &s->id, sizeof(s->id)); io_str_buffer(b, s->current.session_id); io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial)); io_str_buffer(b, s->current.last_mod); - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); } static struct rrdp * @@ -386,10 +383,11 @@ rrdp_input_handler(int fd) enum rrdp_msg type; enum http_result res; long long serial; - size_t id; + size_t id, size; int infd, ok; - infd = io_recvfd(fd, &type, sizeof(type)); + infd = io_recvfd(fd, &size, sizeof(size)); + io_simple_read(fd, &type, sizeof(type)); io_simple_read(fd, &id, sizeof(id)); switch (type) { @@ -665,8 +663,7 @@ publish_done(struct rrdp *s, struct publish_xml *pxml) /* only send files if the fetch did not fail already */ if (s->file_failed == 0) { - if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &type, sizeof(type)); io_simple_buffer(b, &s->id, sizeof(s->id)); io_simple_buffer(b, &pxml->type, sizeof(pxml->type)); @@ -674,7 +671,7 @@ publish_done(struct rrdp *s, struct publish_xml *pxml) io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash)); io_str_buffer(b, pxml->uri); io_buf_buffer(b, data, datasz); - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); s->file_pending++; } diff --git a/usr.sbin/rpki-client/rsync.c b/usr.sbin/rpki-client/rsync.c index 740d0ada53b..6b115447cdb 100644 --- a/usr.sbin/rpki-client/rsync.c +++ b/usr.sbin/rpki-client/rsync.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rsync.c,v 1.25 2021/09/01 12:26:26 claudio Exp $ */ +/* $OpenBSD: rsync.c,v 1.26 2021/10/22 11:13:06 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -178,8 +178,7 @@ proc_rsync(char *prog, char *bind_addr, int fd) for (;;) { char *uri = NULL, *dst = NULL; - ssize_t ssz; - size_t id; + size_t id, size; pid_t pid; int st; @@ -217,12 +216,10 @@ proc_rsync(char *prog, char *bind_addr, int fd) ok = 0; } - b = ibuf_open(sizeof(size_t) + sizeof(ok)); - if (b == NULL) - err(1, NULL); + b = io_buf_new(); io_simple_buffer(b, &ids[i].id, sizeof(size_t)); io_simple_buffer(b, &ok, sizeof(ok)); - ibuf_close(&msgq, b); + io_buf_close(&msgq, b); free(ids[i].uri); ids[i].uri = NULL; @@ -243,21 +240,16 @@ proc_rsync(char *prog, char *bind_addr, int fd) } } + /* connection closed */ + if (pfd.revents & POLLHUP) + break; + if (!(pfd.revents & POLLIN)) continue; - /* - * Read til the parent exits. - * That will mean that we can safely exit. - */ - - if ((ssz = read(fd, &id, sizeof(size_t))) == -1) - err(1, "read"); - if (ssz == 0) - break; - /* Read host and module. */ - + io_simple_read(fd, &size, sizeof(size)); + io_simple_read(fd, &id, sizeof(id)); io_str_read(fd, &dst); io_str_read(fd, &uri); assert(dst); -- 2.20.1