First step of cleanup in the io land. Introduce io_buf_new() and
authorclaudio <claudio@openbsd.org>
Fri, 22 Oct 2021 11:13:06 +0000 (11:13 +0000)
committerclaudio <claudio@openbsd.org>
Fri, 22 Oct 2021 11:13:06 +0000 (11:13 +0000)
io_buf_close(). These function will inject a size of the the buffer
at the beginning of the buffer and will allow the read size to be
switched to proper async IO.
OK benno@

usr.sbin/rpki-client/extern.h
usr.sbin/rpki-client/http.c
usr.sbin/rpki-client/io.c
usr.sbin/rpki-client/main.c
usr.sbin/rpki-client/parser.c
usr.sbin/rpki-client/rrdp.c
usr.sbin/rpki-client/rsync.c

index a740981..1923756 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: extern.h,v 1.72 2021/10/12 15:16:45 job Exp $ */
+/*     $OpenBSD: extern.h,v 1.73 2021/10/22 11:13:06 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
  *
@@ -388,6 +388,7 @@ struct stats {
 };
 
 struct ibuf;
+struct msgbuf;
 
 /* global variables */
 extern int verbose;
@@ -536,9 +537,11 @@ char               *hex_encode(const unsigned char *, size_t);
 
 void            io_socket_blocking(int);
 void            io_socket_nonblocking(int);
+struct ibuf    *io_buf_new(void);
 void            io_simple_buffer(struct ibuf *, const void *, size_t);
 void            io_buf_buffer(struct ibuf *, const void *, size_t);
 void            io_str_buffer(struct ibuf *, const char *);
+void            io_buf_close(struct msgbuf *, struct ibuf *);
 void            io_simple_read(int, void *, size_t);
 void            io_buf_read_alloc(int, void **, size_t *);
 void            io_str_read(int, char **);
index 67c17f1..ffd56a1 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: http.c,v 1.42 2021/10/05 07:22:21 claudio Exp $  */
+/*     $OpenBSD: http.c,v 1.43 2021/10/22 11:13:06 claudio Exp $  */
 /*
  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
  * Copyright (c) 2020 Claudio Jeker <claudio@openbsd.org>
@@ -569,12 +569,11 @@ http_req_done(size_t id, enum http_result res, const char *last_modified)
 {
        struct ibuf *b;
 
-       if ((b = ibuf_dynamic(64, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &id, sizeof(id));
        io_simple_buffer(b, &res, sizeof(res));
        io_str_buffer(b, last_modified);
-       ibuf_close(&msgq, b);
+       io_buf_close(&msgq, b);
 }
 
 /*
@@ -586,12 +585,11 @@ http_req_fail(size_t id)
        struct ibuf *b;
        enum http_result res = HTTP_FAILED;
 
-       if ((b = ibuf_dynamic(8, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &id, sizeof(id));
        io_simple_buffer(b, &res, sizeof(res));
        io_str_buffer(b, NULL);
-       ibuf_close(&msgq, b);
+       io_buf_close(&msgq, b);
 }
 
 /*
@@ -1861,12 +1859,13 @@ proc_http(char *bind_addr, int fd)
                        }
                }
                if (pfds[0].revents & POLLIN) {
-                       size_t id;
+                       size_t id, size;
                        int outfd;
                        char *uri;
                        char *mod;
 
-                       outfd = io_recvfd(fd, &id, sizeof(id));
+                       outfd = io_recvfd(fd, &size, sizeof(size));
+                       io_simple_read(fd, &id, sizeof(id));
                        io_str_read(fd, &uri);
                        io_str_read(fd, &mod);
 
index 507a2d6..e22423d 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: io.c,v 1.13 2021/03/04 13:01:41 claudio Exp $ */
+/*     $OpenBSD: io.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
  *
@@ -53,7 +53,22 @@ io_socket_nonblocking(int fd)
 }
 
 /*
- * Like io_simple_write() but into a buffer.
+ * Create new io buffer, call io_close() when done with it.
+ * Function always returns a new buffer.
+ */
+struct ibuf *
+io_buf_new(void)
+{
+       struct ibuf *b;
+
+       if ((b = ibuf_dynamic(64, INT32_MAX)) == NULL)
+               err(1, NULL);
+       ibuf_reserve(b, sizeof(size_t));        /* can not fail */
+       return b;
+}
+
+/*
+ * Add a simple object of static size to the io buffer.
  */
 void
 io_simple_buffer(struct ibuf *b, const void *res, size_t sz)
@@ -86,6 +101,19 @@ io_str_buffer(struct ibuf *b, const char *p)
        io_buf_buffer(b, p, sz);
 }
 
+/*
+ * Finish and enqueue a io buffer.
+ */
+void
+io_buf_close(struct msgbuf *msgbuf, struct ibuf *b)
+{
+       size_t len;
+
+       len = ibuf_size(b);
+       memcpy(ibuf_seek(b, 0, sizeof(len)), &len, sizeof(len));
+       ibuf_close(msgbuf, b);
+}
+
 /*
  * Read of a binary buffer that must be on a blocking descriptor.
  * Does nothing if "sz" is zero.
index bac5025..558d583 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: main.c,v 1.149 2021/10/11 16:50:03 job Exp $ */
+/*     $OpenBSD: main.c,v 1.150 2021/10/22 11:13:06 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
  *
@@ -84,7 +84,6 @@ logx(const char *fmt, ...)
 void
 entity_free(struct entity *ent)
 {
-
        if (ent == NULL)
                return;
 
@@ -102,7 +101,9 @@ entity_free(struct entity *ent)
 void
 entity_read_req(int fd, struct entity *ent)
 {
+       size_t size;
 
+       io_simple_read(fd, &size, sizeof(size));
        io_simple_read(fd, &ent->type, sizeof(enum rtype));
        io_str_read(fd, &ent->file);
        io_simple_read(fd, &ent->has_pkey, sizeof(int));
@@ -126,15 +127,14 @@ entity_write_req(const struct entity *ent)
                return;
        }
 
-       if ((b = ibuf_dynamic(sizeof(*ent), UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &ent->type, sizeof(ent->type));
        io_str_buffer(b, ent->file);
        io_simple_buffer(b, &ent->has_pkey, sizeof(int));
        if (ent->has_pkey)
                io_buf_buffer(b, ent->pkey, ent->pkeysz);
        io_str_buffer(b, ent->descr);
-       ibuf_close(&procq, b);
+       io_buf_close(&procq, b);
 }
 
 /*
@@ -224,12 +224,11 @@ rrdp_file_resp(size_t id, int ok)
        enum rrdp_msg type = RRDP_FILE;
        struct ibuf *b;
 
-       if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &id, sizeof(id));
        io_simple_buffer(b, &ok, sizeof(ok));
-       ibuf_close(&rrdpq, b);
+       io_buf_close(&rrdpq, b);
 }
 
 void
@@ -239,8 +238,7 @@ rrdp_fetch(size_t id, const char *uri, const char *local,
        enum rrdp_msg type = RRDP_START;
        struct ibuf *b;
 
-       if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &id, sizeof(id));
        io_str_buffer(b, local);
@@ -248,7 +246,7 @@ rrdp_fetch(size_t id, const char *uri, const char *local,
        io_str_buffer(b, s->session_id);
        io_simple_buffer(b, &s->serial, sizeof(s->serial));
        io_str_buffer(b, s->last_mod);
-       ibuf_close(&rrdpq, b);
+       io_buf_close(&rrdpq, b);
 }
 
 /*
@@ -259,12 +257,11 @@ rsync_fetch(size_t id, const char *uri, const char *local)
 {
        struct ibuf     *b;
 
-       if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &id, sizeof(id));
        io_str_buffer(b, local);
        io_str_buffer(b, uri);
-       ibuf_close(&rsyncq, b);
+       io_buf_close(&rsyncq, b);
 }
 
 /*
@@ -275,14 +272,13 @@ http_fetch(size_t id, const char *uri, const char *last_mod, int fd)
 {
        struct ibuf     *b;
 
-       if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &id, sizeof(id));
        io_str_buffer(b, uri);
        io_str_buffer(b, last_mod);
        /* pass file as fd */
        b->fd = fd;
-       ibuf_close(&httpq, b);
+       io_buf_close(&httpq, b);
 }
 
 /*
@@ -299,12 +295,11 @@ rrdp_http_fetch(size_t id, const char *uri, const char *last_mod)
        if (pipe2(pi, O_CLOEXEC | O_NONBLOCK) == -1)
                err(1, "pipe");
 
-       if ((b = ibuf_open(sizeof(type) + sizeof(id))) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &id, sizeof(id));
        b->fd = pi[0];
-       ibuf_close(&rrdpq, b);
+       io_buf_close(&rrdpq, b);
 
        http_fetch(id, uri, last_mod, pi[1]);
 }
@@ -316,13 +311,12 @@ rrdp_http_done(size_t id, enum http_result res, const char *last_mod)
        struct ibuf *b;
 
        /* RRDP request, relay response over to the rrdp process */
-       if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &id, sizeof(id));
        io_simple_buffer(b, &res, sizeof(res));
        io_str_buffer(b, last_mod);
-       ibuf_close(&rrdpq, b);
+       io_buf_close(&rrdpq, b);
 }
 
 /*
@@ -473,6 +467,7 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree,
        struct cert     *cert;
        struct mft      *mft;
        struct roa      *roa;
+       size_t           size;
        int              c;
 
        /*
@@ -481,6 +476,7 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree,
         * certificate, for example).
         * We follow that up with whether the resources didn't parse.
         */
+       io_simple_read(proc, &size, sizeof(size));
        io_simple_read(proc, &type, sizeof(type));
 
        switch (type) {
@@ -553,7 +549,7 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree,
                st->gbrs++;
                break;
        default:
-               errx(1, "unknown entity type");
+               errx(1, "unknown entity type %d", type);
        }
 
        entity_queue--;
@@ -629,7 +625,7 @@ main(int argc, char *argv[])
 {
        int              rc, c, st, proc, rsync, http, rrdp, ok,
                         hangup = 0, fl = SOCK_STREAM | SOCK_CLOEXEC;
-       size_t           i, id, talsz = 0;
+       size_t           i, id, talsz = 0, size;
        pid_t            pid, procpid, rsyncpid, httppid, rrdppid;
        int              fd[2];
        struct pollfd    pfd[NPFD];
@@ -1004,6 +1000,7 @@ main(int argc, char *argv[])
                 */
 
                if ((pfd[1].revents & POLLIN)) {
+                       io_simple_read(rsync, &size, sizeof(size));
                        io_simple_read(rsync, &id, sizeof(id));
                        io_simple_read(rsync, &ok, sizeof(ok));
                        rsync_finish(id, ok);
@@ -1013,6 +1010,7 @@ main(int argc, char *argv[])
                        enum http_result res;
                        char *last_mod;
 
+                       io_simple_read(http, &size, sizeof(size));
                        io_simple_read(http, &id, sizeof(id));
                        io_simple_read(http, &res, sizeof(res));
                        io_str_read(http, &last_mod);
@@ -1031,6 +1029,7 @@ main(int argc, char *argv[])
                        char hash[SHA256_DIGEST_LENGTH];
                        size_t dsz;
 
+                       io_simple_read(rrdp, &size, sizeof(size));
                        io_simple_read(rrdp, &type, sizeof(type));
                        io_simple_read(rrdp, &id, sizeof(id));
 
index cecec5f..7968790 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: parser.c,v 1.13 2021/10/11 16:50:03 job Exp $ */
+/*     $OpenBSD: parser.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */
 /*
  * Copyright (c) 2019 Claudio Jeker <claudio@openbsd.org>
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
@@ -523,13 +523,13 @@ proc_parser(int fd)
        struct roa      *roa;
        struct entity   *entp;
        struct entityq   q;
-       int              c, rc = 1;
        struct msgbuf    msgq;
        struct pollfd    pfd;
        struct ibuf     *b;
        X509_STORE_CTX  *ctx;
        struct auth_tree auths = RB_INITIALIZER(&auths);
        struct crl_tree  crlt = RB_INITIALIZER(&crlt);
+       int              c, rc = 1;
 
        ERR_load_crypto_strings();
        OpenSSL_add_all_ciphers();
@@ -602,8 +602,7 @@ proc_parser(int fd)
                entp = TAILQ_FIRST(&q);
                assert(entp != NULL);
 
-               if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-                       err(1, NULL);
+               b = io_buf_new();
                io_simple_buffer(b, &entp->type, sizeof(entp->type));
 
                switch (entp->type) {
@@ -656,7 +655,7 @@ proc_parser(int fd)
                        abort();
                }
 
-               ibuf_close(&msgq, b);
+               io_buf_close(&msgq, b);
                TAILQ_REMOVE(&q, entp, entries);
                entity_free(entp);
        }
index d2fb063..e1bb138 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: rrdp.c,v 1.11 2021/08/31 15:18:53 claudio Exp $ */
+/*     $OpenBSD: rrdp.c,v 1.12 2021/10/22 11:13:06 claudio Exp $ */
 /*
  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
@@ -140,12 +140,11 @@ rrdp_done(size_t id, int ok)
        enum rrdp_msg type = RRDP_END;
        struct ibuf *b;
 
-       if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &id, sizeof(id));
        io_simple_buffer(b, &ok, sizeof(ok));
-       ibuf_close(&msgq, b);
+       io_buf_close(&msgq, b);
 }
 
 /*
@@ -162,13 +161,12 @@ rrdp_http_req(size_t id, const char *uri, const char *last_mod)
        enum rrdp_msg type = RRDP_HTTP_REQ;
        struct ibuf *b;
 
-       if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &id, sizeof(id));
        io_str_buffer(b, uri);
        io_str_buffer(b, last_mod);
-       ibuf_close(&msgq, b);
+       io_buf_close(&msgq, b);
 }
 
 /*
@@ -180,14 +178,13 @@ rrdp_state_send(struct rrdp *s)
        enum rrdp_msg type = RRDP_SESSION;
        struct ibuf *b;
 
-       if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-               err(1, NULL);
+       b = io_buf_new();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &s->id, sizeof(s->id));
        io_str_buffer(b, s->current.session_id);
        io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial));
        io_str_buffer(b, s->current.last_mod);
-       ibuf_close(&msgq, b);
+       io_buf_close(&msgq, b);
 }
 
 static struct rrdp *
@@ -386,10 +383,11 @@ rrdp_input_handler(int fd)
        enum rrdp_msg type;
        enum http_result res;
        long long serial;
-       size_t id;
+       size_t id, size;
        int infd, ok;
 
-       infd = io_recvfd(fd, &type, sizeof(type));
+       infd = io_recvfd(fd, &size, sizeof(size));
+       io_simple_read(fd, &type, sizeof(type));
        io_simple_read(fd, &id, sizeof(id));
 
        switch (type) {
@@ -665,8 +663,7 @@ publish_done(struct rrdp *s, struct publish_xml *pxml)
 
        /* only send files if the fetch did not fail already */
        if (s->file_failed == 0) {
-               if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
-                       err(1, NULL);
+               b = io_buf_new();
                io_simple_buffer(b, &type, sizeof(type));
                io_simple_buffer(b, &s->id, sizeof(s->id));
                io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
@@ -674,7 +671,7 @@ publish_done(struct rrdp *s, struct publish_xml *pxml)
                        io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
                io_str_buffer(b, pxml->uri);
                io_buf_buffer(b, data, datasz);
-               ibuf_close(&msgq, b);
+               io_buf_close(&msgq, b);
                s->file_pending++;
        }
 
index 740d0ad..6b11544 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: rsync.c,v 1.25 2021/09/01 12:26:26 claudio Exp $ */
+/*     $OpenBSD: rsync.c,v 1.26 2021/10/22 11:13:06 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
  *
@@ -178,8 +178,7 @@ proc_rsync(char *prog, char *bind_addr, int fd)
 
        for (;;) {
                char *uri = NULL, *dst = NULL;
-               ssize_t ssz;
-               size_t id;
+               size_t id, size;
                pid_t pid;
                int st;
 
@@ -217,12 +216,10 @@ proc_rsync(char *prog, char *bind_addr, int fd)
                                        ok = 0;
                                }
 
-                               b = ibuf_open(sizeof(size_t) + sizeof(ok));
-                               if (b == NULL)
-                                       err(1, NULL);
+                               b = io_buf_new();
                                io_simple_buffer(b, &ids[i].id, sizeof(size_t));
                                io_simple_buffer(b, &ok, sizeof(ok));
-                               ibuf_close(&msgq, b);
+                               io_buf_close(&msgq, b);
 
                                free(ids[i].uri);
                                ids[i].uri = NULL;
@@ -243,21 +240,16 @@ proc_rsync(char *prog, char *bind_addr, int fd)
                        }
                }
 
+               /* connection closed */
+               if (pfd.revents & POLLHUP)
+                       break;
+
                if (!(pfd.revents & POLLIN))
                        continue;
 
-               /*
-                * Read til the parent exits.
-                * That will mean that we can safely exit.
-                */
-
-               if ((ssz = read(fd, &id, sizeof(size_t))) == -1)
-                       err(1, "read");
-               if (ssz == 0)
-                       break;
-
                /* Read host and module. */
-
+               io_simple_read(fd, &size, sizeof(size));
+               io_simple_read(fd, &id, sizeof(id));
                io_str_read(fd, &dst);
                io_str_read(fd, &uri);
                assert(dst);