From: claudio Date: Sat, 23 Oct 2021 16:06:04 +0000 (+0000) Subject: Finnally move away from blocking reads in rpki-client. The code was a X-Git-Url: http://artulab.com/gitweb/?a=commitdiff_plain;h=7eb79a4a63c58a6abbcfe3ffb1e4a81b2024d015;p=openbsd Finnally move away from blocking reads in rpki-client. The code was a mish mash of poll, non-blocking writes and blocking reads. Using the introduced ibuf size header in io_buf_new()/io_buf_close() the read side can be changed to pull in a full ibuf and only start the un-marshal once all data has been read. OK benno@ --- diff --git a/usr.sbin/rpki-client/cert.c b/usr.sbin/rpki-client/cert.c index f1b81580a5d..7a384194f08 100644 --- a/usr.sbin/rpki-client/cert.c +++ b/usr.sbin/rpki-client/cert.c @@ -1,4 +1,4 @@ -/* $OpenBSD: cert.c,v 1.39 2021/10/15 22:30:33 job Exp $ */ +/* $OpenBSD: cert.c,v 1.40 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2021 Job Snijders * Copyright (c) 2019 Kristaps Dzonsons @@ -1281,33 +1281,31 @@ cert_buffer(struct ibuf *b, const struct cert *p) } static void -cert_ip_read(int fd, struct cert_ip *p) +cert_ip_read(struct ibuf *b, struct cert_ip *p) { - - io_simple_read(fd, &p->afi, sizeof(enum afi)); - io_simple_read(fd, &p->type, sizeof(enum cert_ip_type)); + io_read_buf(b, &p->afi, sizeof(enum afi)); + io_read_buf(b, &p->type, sizeof(enum cert_ip_type)); if (p->type != CERT_IP_INHERIT) { - io_simple_read(fd, &p->min, sizeof(p->min)); - io_simple_read(fd, &p->max, sizeof(p->max)); + io_read_buf(b, &p->min, sizeof(p->min)); + io_read_buf(b, &p->max, sizeof(p->max)); } if (p->type == CERT_IP_RANGE) - ip_addr_range_read(fd, &p->range); + ip_addr_range_read(b, &p->range); else if (p->type == CERT_IP_ADDR) - ip_addr_read(fd, &p->ip); + ip_addr_read(b, &p->ip); } static void -cert_as_read(int fd, struct cert_as *p) +cert_as_read(struct ibuf *b, struct cert_as *p) { - - io_simple_read(fd, &p->type, sizeof(enum cert_as_type)); + io_read_buf(b, &p->type, sizeof(enum cert_as_type)); if (p->type == CERT_AS_RANGE) { - io_simple_read(fd, &p->range.min, sizeof(uint32_t)); - io_simple_read(fd, &p->range.max, sizeof(uint32_t)); + io_read_buf(b, &p->range.min, sizeof(uint32_t)); + io_read_buf(b, &p->range.max, sizeof(uint32_t)); } else if (p->type == CERT_AS_ID) - io_simple_read(fd, &p->id, sizeof(uint32_t)); + io_read_buf(b, &p->id, sizeof(uint32_t)); } /* @@ -1316,7 +1314,7 @@ cert_as_read(int fd, struct cert_as *p) * Always returns a valid pointer. */ struct cert * -cert_read(int fd) +cert_read(struct ibuf *b) { struct cert *p; size_t i; @@ -1324,35 +1322,36 @@ cert_read(int fd) if ((p = calloc(1, sizeof(struct cert))) == NULL) err(1, NULL); - io_simple_read(fd, &p->valid, sizeof(int)); - io_simple_read(fd, &p->expires, sizeof(time_t)); - io_simple_read(fd, &p->purpose, sizeof(enum cert_purpose)); - io_simple_read(fd, &p->ipsz, sizeof(size_t)); + io_read_buf(b, &p->valid, sizeof(int)); + io_read_buf(b, &p->expires, sizeof(time_t)); + io_read_buf(b, &p->purpose, sizeof(enum cert_purpose)); + io_read_buf(b, &p->ipsz, sizeof(size_t)); + p->ips = calloc(p->ipsz, sizeof(struct cert_ip)); if (p->ips == NULL) err(1, NULL); for (i = 0; i < p->ipsz; i++) - cert_ip_read(fd, &p->ips[i]); + cert_ip_read(b, &p->ips[i]); - io_simple_read(fd, &p->asz, sizeof(size_t)); + io_read_buf(b, &p->asz, sizeof(size_t)); p->as = calloc(p->asz, sizeof(struct cert_as)); if (p->as == NULL) err(1, NULL); for (i = 0; i < p->asz; i++) - cert_as_read(fd, &p->as[i]); + cert_as_read(b, &p->as[i]); + + io_read_str(b, &p->mft); + io_read_str(b, &p->notify); + io_read_str(b, &p->repo); + io_read_str(b, &p->crl); + io_read_str(b, &p->aia); + io_read_str(b, &p->aki); + io_read_str(b, &p->ski); + io_read_str(b, &p->tal); + io_read_str(b, &p->pubkey); - io_str_read(fd, &p->mft); assert(p->mft != NULL || p->purpose == CERT_PURPOSE_BGPSEC_ROUTER); - io_str_read(fd, &p->notify); - io_str_read(fd, &p->repo); - io_str_read(fd, &p->crl); - io_str_read(fd, &p->aia); - io_str_read(fd, &p->aki); - io_str_read(fd, &p->ski); assert(p->ski); - io_str_read(fd, &p->tal); - io_str_read(fd, &p->pubkey); - return p; } diff --git a/usr.sbin/rpki-client/extern.h b/usr.sbin/rpki-client/extern.h index 1923756216d..661be5e20eb 100644 --- a/usr.sbin/rpki-client/extern.h +++ b/usr.sbin/rpki-client/extern.h @@ -1,4 +1,4 @@ -/* $OpenBSD: extern.h,v 1.73 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: extern.h,v 1.74 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -399,25 +399,25 @@ void tal_buffer(struct ibuf *, const struct tal *); void tal_free(struct tal *); struct tal *tal_parse(const char *, char *); char *tal_read_file(const char *); -struct tal *tal_read(int); +struct tal *tal_read(struct ibuf *); void cert_buffer(struct ibuf *, const struct cert *); void cert_free(struct cert *); struct cert *cert_parse(X509 **, const char *); struct cert *ta_parse(X509 **, const char *, const unsigned char *, size_t); -struct cert *cert_read(int); +struct cert *cert_read(struct ibuf *); void cert_insert_brks(struct brk_tree *, struct cert *); void mft_buffer(struct ibuf *, const struct mft *); void mft_free(struct mft *); struct mft *mft_parse(X509 **, const char *); int mft_check(const char *, struct mft *); -struct mft *mft_read(int); +struct mft *mft_read(struct ibuf *); void roa_buffer(struct ibuf *, const struct roa *); void roa_free(struct roa *); struct roa *roa_parse(X509 **, const char *); -struct roa *roa_read(int); +struct roa *roa_read(struct ibuf *); void roa_insert_vrps(struct vrp_tree *, struct roa *, size_t *, size_t *); @@ -460,8 +460,8 @@ void ip_addr_print(const struct ip_addr *, enum afi, char *, void ip_addr_buffer(struct ibuf *, const struct ip_addr *); void ip_addr_range_buffer(struct ibuf *, const struct ip_addr_range *); -void ip_addr_read(int, struct ip_addr *); -void ip_addr_range_read(int, struct ip_addr_range *); +void ip_addr_read(struct ibuf *, struct ip_addr *); +void ip_addr_range_read(struct ibuf *, struct ip_addr_range *); int ip_addr_cmp(const struct ip_addr *, const struct ip_addr *); int ip_addr_check_overlap(const struct cert_ip *, const char *, const struct cert_ip *, size_t); @@ -480,7 +480,7 @@ int as_check_covered(uint32_t, uint32_t, /* Parser-specific */ void entity_free(struct entity *); -void entity_read_req(int fd, struct entity *); +void entity_read_req(struct ibuf *, struct entity *); void entityq_flush(struct entityq *, struct repo *); void proc_parser(int) __attribute__((noreturn)); @@ -535,8 +535,6 @@ char *hex_encode(const unsigned char *, size_t); /* Functions for moving data between processes. */ -void io_socket_blocking(int); -void io_socket_nonblocking(int); struct ibuf *io_buf_new(void); void io_simple_buffer(struct ibuf *, const void *, size_t); void io_buf_buffer(struct ibuf *, const void *, size_t); @@ -545,7 +543,11 @@ void io_buf_close(struct msgbuf *, struct ibuf *); void io_simple_read(int, void *, size_t); void io_buf_read_alloc(int, void **, size_t *); void io_str_read(int, char **); -int io_recvfd(int, void *, size_t); +void io_read_buf(struct ibuf *, void *, size_t); +void io_read_str(struct ibuf *, char **); +void io_read_buf_alloc(struct ibuf *, void **, size_t *); +struct ibuf *io_buf_read(int, struct ibuf **); +struct ibuf *io_buf_recvfd(int, struct ibuf **); /* X509 helpers. */ diff --git a/usr.sbin/rpki-client/http.c b/usr.sbin/rpki-client/http.c index ffd56a16d4f..4907904950f 100644 --- a/usr.sbin/rpki-client/http.c +++ b/usr.sbin/rpki-client/http.c @@ -1,4 +1,4 @@ -/* $OpenBSD: http.c,v 1.43 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: http.c,v 1.44 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2020 Nils Fisher * Copyright (c) 2020 Claudio Jeker @@ -1769,6 +1769,7 @@ proc_http(char *bind_addr, int fd) struct pollfd pfds[NPFDS]; struct http_connection *conn, *nc; struct http_request *req, *nr; + struct ibuf *b, *inbuf = NULL; if (bind_addr != NULL) { struct addrinfo hints, *res; @@ -1859,18 +1860,20 @@ proc_http(char *bind_addr, int fd) } } if (pfds[0].revents & POLLIN) { - size_t id, size; - int outfd; - char *uri; - char *mod; - - outfd = io_recvfd(fd, &size, sizeof(size)); - io_simple_read(fd, &id, sizeof(id)); - io_str_read(fd, &uri); - io_str_read(fd, &mod); - - /* queue up new requests */ - http_req_new(id, uri, mod, outfd); + b = io_buf_recvfd(fd, &inbuf); + if (b != NULL) { + size_t id; + char *uri; + char *mod; + + io_read_buf(b, &id, sizeof(id)); + io_read_str(b, &uri); + io_read_str(b, &mod); + + /* queue up new requests */ + http_req_new(id, uri, mod, b->fd); + ibuf_free(b); + } } now = getmonotime(); diff --git a/usr.sbin/rpki-client/io.c b/usr.sbin/rpki-client/io.c index e22423df22d..0d7ab89e64c 100644 --- a/usr.sbin/rpki-client/io.c +++ b/usr.sbin/rpki-client/io.c @@ -1,4 +1,4 @@ -/* $OpenBSD: io.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: io.c,v 1.15 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -30,28 +30,6 @@ #include "extern.h" -void -io_socket_blocking(int fd) -{ - int fl; - - if ((fl = fcntl(fd, F_GETFL, 0)) == -1) - err(1, "fcntl"); - if (fcntl(fd, F_SETFL, fl & ~O_NONBLOCK) == -1) - err(1, "fcntl"); -} - -void -io_socket_nonblocking(int fd) -{ - int fl; - - if ((fl = fcntl(fd, F_GETFL, 0)) == -1) - err(1, "fcntl"); - if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1) - err(1, "fcntl"); -} - /* * Create new io buffer, call io_close() when done with it. * Function always returns a new buffer. @@ -109,80 +87,148 @@ io_buf_close(struct msgbuf *msgbuf, struct ibuf *b) { size_t len; - len = ibuf_size(b); + len = ibuf_size(b) - sizeof(len); memcpy(ibuf_seek(b, 0, sizeof(len)), &len, sizeof(len)); ibuf_close(msgbuf, b); } /* - * Read of a binary buffer that must be on a blocking descriptor. + * Read of an ibuf and extract sz byte from there. * Does nothing if "sz" is zero. - * This will fail and exit on EOF. + * Return 1 on success or 0 if there was not enough data. */ void -io_simple_read(int fd, void *res, size_t sz) +io_read_buf(struct ibuf *b, void *res, size_t sz) { - ssize_t ssz; char *tmp; - tmp = res; /* arithmetic on a pointer to void is a GNU extension */ -again: if (sz == 0) return; - if ((ssz = read(fd, tmp, sz)) == -1) - err(1, "read"); - else if (ssz == 0) - errx(1, "read: unexpected end of file"); - else if ((size_t)ssz == sz) + tmp = ibuf_seek(b, b->rpos, sz); + if (tmp == NULL) + errx(1, "bad internal framing, buffer too short"); + b->rpos += sz; + memcpy(res, tmp, sz); +} + +/* + * Read a string (returns NULL for zero-length strings), allocating + * space for it. + * Return 1 on success or 0 if there was not enough data. + */ +void +io_read_str(struct ibuf *b, char **res) +{ + size_t sz; + + io_read_buf(b, &sz, sizeof(sz)); + if (sz == 0) { + *res = NULL; return; - sz -= ssz; - tmp += ssz; - goto again; + } + if ((*res = calloc(sz + 1, 1)) == NULL) + err(1, NULL); + io_read_buf(b, *res, sz); } /* * Read a binary buffer, allocating space for it. * If the buffer is zero-sized, this won't allocate "res", but * will still initialise it to NULL. + * Return 1 on success or 0 if there was not enough data. */ void -io_buf_read_alloc(int fd, void **res, size_t *sz) +io_read_buf_alloc(struct ibuf *b, void **res, size_t *sz) { - *res = NULL; - io_simple_read(fd, sz, sizeof(size_t)); + io_read_buf(b, sz, sizeof(sz)); if (*sz == 0) return; if ((*res = malloc(*sz)) == NULL) err(1, NULL); - io_simple_read(fd, *res, *sz); + io_read_buf(b, *res, *sz); +} + +/* XXX copy from imsg-buffer.c */ +static int +ibuf_realloc(struct ibuf *buf, size_t len) +{ + unsigned char *b; + + /* on static buffers max is eq size and so the following fails */ + if (buf->wpos + len > buf->max) { + errno = ERANGE; + return (-1); + } + + b = recallocarray(buf->buf, buf->size, buf->wpos + len, 1); + if (b == NULL) + return (-1); + buf->buf = b; + buf->size = buf->wpos + len; + + return (0); } /* - * Read a string (returns NULL for zero-length strings), allocating - * space for it. + * Read once and fill a ibuf until it is finished. + * Returns NULL if more data is needed, returns a full ibuf once + * all data is received. */ -void -io_str_read(int fd, char **res) +struct ibuf * +io_buf_read(int fd, struct ibuf **ib) { - size_t sz; + struct ibuf *b = *ib; + ssize_t n; + size_t sz; - io_simple_read(fd, &sz, sizeof(size_t)); - if (sz == 0) { - *res = NULL; - return; + /* if ibuf == NULL allocate a new buffer */ + if (b == NULL) { + if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL) + err(1, NULL); + *ib = b; } - if ((*res = calloc(sz + 1, 1)) == NULL) - err(1, NULL); - io_simple_read(fd, *res, sz); + + /* read some data */ + while ((n = read(fd, b->buf + b->wpos, b->size - b->wpos)) == -1) { + if (errno == EINTR) + continue; + err(1, "read"); + } + + if (n == 0) + errx(1, "read: unexpected end of file"); + b->wpos += n; + + /* got full message */ + if (b->wpos == b->size) { + /* only header received */ + if (b->wpos == sizeof(sz)) { + memcpy(&sz, b->buf, sizeof(sz)); + if (sz == 0 || sz > INT32_MAX) + errx(1, "bad internal framing, bad size"); + if (ibuf_realloc(b, sz) == -1) + err(1, "ibuf_realloc"); + return NULL; + } + + /* skip over initial size header */ + b->rpos += sizeof(sz); + *ib = NULL; + return b; + } + + return NULL; } + /* * Read data from socket but receive a file descriptor at the same time. */ -int -io_recvfd(int fd, void *res, size_t sz) +struct ibuf * +io_buf_recvfd(int fd, struct ibuf **ib) { + struct ibuf *b = *ib; struct iovec iov; struct msghdr msg; struct cmsghdr *cmsg; @@ -190,15 +236,22 @@ io_recvfd(int fd, void *res, size_t sz) struct cmsghdr hdr; char buf[CMSG_SPACE(sizeof(int))]; } cmsgbuf; - int outfd = -1; - char *b = res; ssize_t n; + size_t sz; + + /* fd are only passed on the head, just use regular read afterwards */ + if (b != NULL) + return io_buf_read(fd, ib); + if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL) + err(1, NULL); + *ib = b; + memset(&msg, 0, sizeof(msg)); memset(&cmsgbuf, 0, sizeof(cmsgbuf)); - iov.iov_base = res; - iov.iov_len = sz; + iov.iov_base = b->buf; + iov.iov_len = b->size; msg.msg_iov = &iov; msg.msg_iovlen = 1; @@ -225,29 +278,32 @@ io_recvfd(int fd, void *res, size_t sz) for (i = 0; i < j; i++) { f = ((int *)CMSG_DATA(cmsg))[i]; if (i == 0) - outfd = f; + b->fd = f; else close(f); } } } - b += n; - sz -= n; - while (sz > 0) { - /* short receive */ - n = recv(fd, b, sz, 0); - if (n == -1) { - if (errno == EINTR) - continue; - err(1, "recv"); + b->wpos += n; + + /* got full message */ + if (b->wpos == b->size) { + /* only header received */ + if (b->wpos == sizeof(sz)) { + memcpy(&sz, b->buf, sizeof(sz)); + if (sz == 0 || sz > INT32_MAX) + errx(1, "read: bad internal framing, %zu", sz); + if (ibuf_realloc(b, sz) == -1) + err(1, "ibuf_realloc"); + return NULL; } - if (n == 0) - errx(1, "recv: unexpected end of file"); - b += n; - sz -= n; + /* skip over initial size header */ + b->rpos += sizeof(sz); + *ib = NULL; + return b; } - return outfd; + return NULL; } diff --git a/usr.sbin/rpki-client/ip.c b/usr.sbin/rpki-client/ip.c index 9a7cc756014..db6d1f2d8a1 100644 --- a/usr.sbin/rpki-client/ip.c +++ b/usr.sbin/rpki-client/ip.c @@ -1,4 +1,4 @@ -/* $OpenBSD: ip.c,v 1.17 2021/04/19 17:04:35 deraadt Exp $ */ +/* $OpenBSD: ip.c,v 1.18 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -314,14 +314,14 @@ ip_addr_range_buffer(struct ibuf *b, const struct ip_addr_range *p) * Matched with ip_addr_buffer(). */ void -ip_addr_read(int fd, struct ip_addr *p) +ip_addr_read(struct ibuf *b, struct ip_addr *p) { size_t sz; - io_simple_read(fd, &p->prefixlen, sizeof(unsigned char)); + io_read_buf(b, &p->prefixlen, sizeof(unsigned char)); sz = PREFIX_SIZE(p->prefixlen); assert(sz <= 16); - io_simple_read(fd, p->addr, sz); + io_read_buf(b, p->addr, sz); } /* @@ -329,11 +329,10 @@ ip_addr_read(int fd, struct ip_addr *p) * Matched with ip_addr_range_buffer(). */ void -ip_addr_range_read(int fd, struct ip_addr_range *p) +ip_addr_range_read(struct ibuf *b, struct ip_addr_range *p) { - - ip_addr_read(fd, &p->min); - ip_addr_read(fd, &p->max); + ip_addr_read(b, &p->min); + ip_addr_read(b, &p->max); } /* diff --git a/usr.sbin/rpki-client/main.c b/usr.sbin/rpki-client/main.c index 558d5837950..e344d37c9ca 100644 --- a/usr.sbin/rpki-client/main.c +++ b/usr.sbin/rpki-client/main.c @@ -1,4 +1,4 @@ -/* $OpenBSD: main.c,v 1.150 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: main.c,v 1.151 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -99,17 +99,14 @@ entity_free(struct entity *ent) * The pointer must be passed entity_free(). */ void -entity_read_req(int fd, struct entity *ent) +entity_read_req(struct ibuf *b, struct entity *ent) { - size_t size; - - io_simple_read(fd, &size, sizeof(size)); - io_simple_read(fd, &ent->type, sizeof(enum rtype)); - io_str_read(fd, &ent->file); - io_simple_read(fd, &ent->has_pkey, sizeof(int)); + io_read_buf(b, &ent->type, sizeof(ent->type)); + io_read_str(b, &ent->file); + io_read_buf(b, &ent->has_pkey, sizeof(ent->has_pkey)); if (ent->has_pkey) - io_buf_read_alloc(fd, (void **)&ent->pkey, &ent->pkeysz); - io_str_read(fd, &ent->descr); + io_read_buf_alloc(b, (void **)&ent->pkey, &ent->pkeysz); + io_read_str(b, &ent->descr); } /* @@ -459,7 +456,7 @@ queue_add_from_cert(const struct cert *cert) * In all cases, we gather statistics. */ static void -entity_process(int proc, struct stats *st, struct vrp_tree *tree, +entity_process(struct ibuf *b, struct stats *st, struct vrp_tree *tree, struct brk_tree *brktree) { enum rtype type; @@ -467,7 +464,6 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, struct cert *cert; struct mft *mft; struct roa *roa; - size_t size; int c; /* @@ -476,24 +472,23 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, * certificate, for example). * We follow that up with whether the resources didn't parse. */ - io_simple_read(proc, &size, sizeof(size)); - io_simple_read(proc, &type, sizeof(type)); + io_read_buf(b, &type, sizeof(type)); switch (type) { case RTYPE_TAL: st->tals++; - tal = tal_read(proc); + tal = tal_read(b); queue_add_from_tal(tal); tal_free(tal); break; case RTYPE_CER: st->certs++; - io_simple_read(proc, &c, sizeof(int)); + io_read_buf(b, &c, sizeof(c)); if (c == 0) { st->certs_fail++; break; } - cert = cert_read(proc); + cert = cert_read(b); if (cert->purpose == CERT_PURPOSE_CA) { if (cert->valid) { /* @@ -517,12 +512,12 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, break; case RTYPE_MFT: st->mfts++; - io_simple_read(proc, &c, sizeof(int)); + io_read_buf(b, &c, sizeof(c)); if (c == 0) { st->mfts_fail++; break; } - mft = mft_read(proc); + mft = mft_read(b); if (mft->stale) st->mfts_stale++; queue_add_from_mft_set(mft); @@ -533,12 +528,12 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, break; case RTYPE_ROA: st->roas++; - io_simple_read(proc, &c, sizeof(int)); + io_read_buf(b, &c, sizeof(c)); if (c == 0) { st->roas_fail++; break; } - roa = roa_read(proc); + roa = roa_read(b); if (roa->valid) roa_insert_vrps(tree, roa, &st->vrps, &st->uniqs); else @@ -555,6 +550,57 @@ entity_process(int proc, struct stats *st, struct vrp_tree *tree, entity_queue--; } +static void +rrdp_process(struct ibuf *b) +{ + enum rrdp_msg type; + enum publish_type pt; + struct rrdp_session s; + char *uri, *last_mod, *data; + char hash[SHA256_DIGEST_LENGTH]; + size_t dsz, id; + int ok; + + io_read_buf(b, &type, sizeof(type)); + io_read_buf(b, &id, sizeof(id)); + + switch (type) { + case RRDP_END: + io_read_buf(b, &ok, sizeof(ok)); + rrdp_finish(id, ok); + break; + case RRDP_HTTP_REQ: + io_read_str(b, &uri); + io_read_str(b, &last_mod); + rrdp_http_fetch(id, uri, last_mod); + break; + case RRDP_SESSION: + io_read_str(b, &s.session_id); + io_read_buf(b, &s.serial, sizeof(s.serial)); + io_read_str(b, &s.last_mod); + rrdp_save_state(id, &s); + free(s.session_id); + free(s.last_mod); + break; + case RRDP_FILE: + io_read_buf(b, &pt, sizeof(pt)); + if (pt != PUB_ADD) + io_read_buf(b, &hash, sizeof(hash)); + io_read_str(b, &uri); + io_read_buf_alloc(b, (void **)&data, &dsz); + + ok = rrdp_handle_file(id, pt, uri, hash, sizeof(hash), + data, dsz); + rrdp_file_resp(id, ok); + + free(uri); + free(data); + break; + default: + errx(1, "unexpected rrdp response"); + } +} + /* * Assign filenames ending in ".tal" in "/etc/rpki" into "tals", * returning the number of files found and filled-in. @@ -623,19 +669,21 @@ suicide(int sig __attribute__((unused))) int main(int argc, char *argv[]) { - int rc, c, st, proc, rsync, http, rrdp, ok, - hangup = 0, fl = SOCK_STREAM | SOCK_CLOEXEC; - size_t i, id, talsz = 0, size; + int rc, c, st, proc, rsync, http, rrdp, ok, hangup = 0; + int fl = SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK; + size_t i, id, talsz = 0; pid_t pid, procpid, rsyncpid, httppid, rrdppid; int fd[2]; struct pollfd pfd[NPFD]; struct msgbuf *queues[NPFD]; + struct ibuf *b, *httpbuf = NULL, *procbuf = NULL; + struct ibuf *rrdpbuf = NULL, *rsyncbuf = NULL; char *rsync_prog = "openrsync"; char *bind_addr = NULL; const char *cachedir = NULL, *outputdir = NULL; const char *tals[TALSZ_MAX], *errs, *name; - struct vrp_tree v = RB_INITIALIZER(&v); - struct brk_tree b = RB_INITIALIZER(&b); + struct vrp_tree vrps = RB_INITIALIZER(&vrps); + struct brk_tree brks = RB_INITIALIZER(&brks); struct rusage ru; struct timeval start_time, now_time; @@ -972,12 +1020,6 @@ main(int argc, char *argv[]) hangup = 1; } if (pfd[i].revents & POLLOUT) { - /* - * XXX work around deadlocks because of - * blocking read vs non-blocking writes. - */ - if (i > 1) - io_socket_nonblocking(pfd[i].fd); switch (msgbuf_write(queues[i])) { case 0: errx(1, "write[%zu]: " @@ -985,8 +1027,6 @@ main(int argc, char *argv[]) case -1: err(1, "write[%zu]", i); } - if (i > 1) - io_socket_blocking(pfd[i].fd); } } if (hangup) @@ -1000,75 +1040,38 @@ main(int argc, char *argv[]) */ if ((pfd[1].revents & POLLIN)) { - io_simple_read(rsync, &size, sizeof(size)); - io_simple_read(rsync, &id, sizeof(id)); - io_simple_read(rsync, &ok, sizeof(ok)); - rsync_finish(id, ok); + b = io_buf_read(rsync, &rsyncbuf); + if (b != NULL) { + io_read_buf(b, &id, sizeof(id)); + io_read_buf(b, &ok, sizeof(ok)); + rsync_finish(id, ok); + ibuf_free(b); + } } if ((pfd[2].revents & POLLIN)) { - enum http_result res; - char *last_mod; - - io_simple_read(http, &size, sizeof(size)); - io_simple_read(http, &id, sizeof(id)); - io_simple_read(http, &res, sizeof(res)); - io_str_read(http, &last_mod); - http_finish(id, res, last_mod); - free(last_mod); + b = io_buf_read(http, &httpbuf); + if (b != NULL) { + enum http_result res; + char *last_mod; + + io_read_buf(b, &id, sizeof(id)); + io_read_buf(b, &res, sizeof(res)); + io_read_str(b, &last_mod); + http_finish(id, res, last_mod); + free(last_mod); + ibuf_free(b); + } } /* * Handle RRDP requests here. */ if ((pfd[3].revents & POLLIN)) { - enum rrdp_msg type; - enum publish_type pt; - struct rrdp_session s; - char *uri, *last_mod, *data; - char hash[SHA256_DIGEST_LENGTH]; - size_t dsz; - - io_simple_read(rrdp, &size, sizeof(size)); - io_simple_read(rrdp, &type, sizeof(type)); - io_simple_read(rrdp, &id, sizeof(id)); - - switch (type) { - case RRDP_END: - io_simple_read(rrdp, &ok, sizeof(ok)); - rrdp_finish(id, ok); - break; - case RRDP_HTTP_REQ: - io_str_read(rrdp, &uri); - io_str_read(rrdp, &last_mod); - rrdp_http_fetch(id, uri, last_mod); - break; - case RRDP_SESSION: - io_str_read(rrdp, &s.session_id); - io_simple_read(rrdp, &s.serial, - sizeof(s.serial)); - io_str_read(rrdp, &s.last_mod); - rrdp_save_state(id, &s); - free(s.session_id); - free(s.last_mod); - break; - case RRDP_FILE: - io_simple_read(rrdp, &pt, sizeof(pt)); - if (pt != PUB_ADD) - io_simple_read(rrdp, &hash, - sizeof(hash)); - io_str_read(rrdp, &uri); - io_buf_read_alloc(rrdp, (void **)&data, &dsz); - - ok = rrdp_handle_file(id, pt, uri, - hash, sizeof(hash), data, dsz); - rrdp_file_resp(id, ok); - - free(uri); - free(data); - break; - default: - errx(1, "unexpected rrdp response"); + b = io_buf_read(rrdp, &rrdpbuf); + if (b != NULL) { + rrdp_process(b); + ibuf_free(b); } } @@ -1078,7 +1081,11 @@ main(int argc, char *argv[]) */ if ((pfd[0].revents & POLLIN)) { - entity_process(proc, &stats, &v, &b); + b = io_buf_read(proc, &procbuf); + if (b != NULL) { + entity_process(b, &stats, &vrps, &brks); + ibuf_free(b); + } } } @@ -1154,7 +1161,7 @@ main(int argc, char *argv[]) if (fchdir(outdirfd) == -1) err(1, "fchdir output dir"); - if (outputfiles(&v, &b, &stats)) + if (outputfiles(&vrps, &brks, &stats)) rc = 1; diff --git a/usr.sbin/rpki-client/mft.c b/usr.sbin/rpki-client/mft.c index f80d2321eb2..0fc6f005be9 100644 --- a/usr.sbin/rpki-client/mft.c +++ b/usr.sbin/rpki-client/mft.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mft.c,v 1.38 2021/09/09 14:15:49 claudio Exp $ */ +/* $OpenBSD: mft.c,v 1.39 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -564,7 +564,7 @@ mft_buffer(struct ibuf *b, const struct mft *p) * Result must be passed to mft_free(). */ struct mft * -mft_read(int fd) +mft_read(struct ibuf *b) { struct mft *p = NULL; size_t i; @@ -572,22 +572,22 @@ mft_read(int fd) if ((p = calloc(1, sizeof(struct mft))) == NULL) err(1, NULL); - io_simple_read(fd, &p->stale, sizeof(int)); - io_str_read(fd, &p->file); - assert(p->file); - io_simple_read(fd, &p->filesz, sizeof(size_t)); + io_read_buf(b, &p->stale, sizeof(int)); + io_read_str(b, &p->file); + io_read_buf(b, &p->filesz, sizeof(size_t)); + assert(p->file); if ((p->files = calloc(p->filesz, sizeof(struct mftfile))) == NULL) err(1, NULL); for (i = 0; i < p->filesz; i++) { - io_str_read(fd, &p->files[i].file); - io_simple_read(fd, p->files[i].hash, SHA256_DIGEST_LENGTH); + io_read_str(b, &p->files[i].file); + io_read_buf(b, p->files[i].hash, SHA256_DIGEST_LENGTH); } - io_str_read(fd, &p->aia); - io_str_read(fd, &p->aki); - io_str_read(fd, &p->ski); + io_read_str(b, &p->aia); + io_read_str(b, &p->aki); + io_read_str(b, &p->ski); assert(p->aia && p->aki && p->ski); return p; diff --git a/usr.sbin/rpki-client/parser.c b/usr.sbin/rpki-client/parser.c index 796879094a1..5b2a96abb4a 100644 --- a/usr.sbin/rpki-client/parser.c +++ b/usr.sbin/rpki-client/parser.c @@ -1,4 +1,4 @@ -/* $OpenBSD: parser.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: parser.c,v 1.15 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Claudio Jeker * Copyright (c) 2019 Kristaps Dzonsons @@ -525,7 +525,7 @@ proc_parser(int fd) struct entityq q; struct msgbuf msgq; struct pollfd pfd; - struct ibuf *b; + struct ibuf *b, *inbuf = NULL; X509_STORE_CTX *ctx; struct auth_tree auths = RB_INITIALIZER(&auths); struct crl_tree crlt = RB_INITIALIZER(&crlt); @@ -545,8 +545,6 @@ proc_parser(int fd) pfd.fd = fd; - io_socket_nonblocking(pfd.fd); - for (;;) { pfd.events = POLLIN; if (msgq.queued) @@ -571,13 +569,16 @@ proc_parser(int fd) */ if ((pfd.revents & POLLIN)) { - io_socket_blocking(fd); - entp = calloc(1, sizeof(struct entity)); - if (entp == NULL) - err(1, NULL); - entity_read_req(fd, entp); - TAILQ_INSERT_TAIL(&q, entp, entries); - io_socket_nonblocking(fd); + b = io_buf_read(fd, &inbuf); + + if (b != NULL) { + entp = calloc(1, sizeof(struct entity)); + if (entp == NULL) + err(1, NULL); + entity_read_req(b, entp); + TAILQ_INSERT_TAIL(&q, entp, entries); + ibuf_free(b); + } } if (pfd.revents & POLLOUT) { diff --git a/usr.sbin/rpki-client/roa.c b/usr.sbin/rpki-client/roa.c index c1fdc1a0fad..47a45078575 100644 --- a/usr.sbin/rpki-client/roa.c +++ b/usr.sbin/rpki-client/roa.c @@ -1,4 +1,4 @@ -/* $OpenBSD: roa.c,v 1.26 2021/10/07 08:28:45 claudio Exp $ */ +/* $OpenBSD: roa.c,v 1.27 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -448,7 +448,7 @@ roa_buffer(struct ibuf *b, const struct roa *p) * Result must be passed to roa_free(). */ struct roa * -roa_read(int fd) +roa_read(struct ibuf *b) { struct roa *p; size_t i; @@ -456,26 +456,26 @@ roa_read(int fd) if ((p = calloc(1, sizeof(struct roa))) == NULL) err(1, NULL); - io_simple_read(fd, &p->valid, sizeof(int)); - io_simple_read(fd, &p->asid, sizeof(uint32_t)); - io_simple_read(fd, &p->ipsz, sizeof(size_t)); - io_simple_read(fd, &p->expires, sizeof(time_t)); + io_read_buf(b, &p->valid, sizeof(int)); + io_read_buf(b, &p->asid, sizeof(uint32_t)); + io_read_buf(b, &p->ipsz, sizeof(size_t)); + io_read_buf(b, &p->expires, sizeof(time_t)); if ((p->ips = calloc(p->ipsz, sizeof(struct roa_ip))) == NULL) err(1, NULL); for (i = 0; i < p->ipsz; i++) { - io_simple_read(fd, &p->ips[i].afi, sizeof(enum afi)); - io_simple_read(fd, &p->ips[i].maxlength, sizeof(size_t)); - io_simple_read(fd, &p->ips[i].min, sizeof(p->ips[i].min)); - io_simple_read(fd, &p->ips[i].max, sizeof(p->ips[i].max)); - ip_addr_read(fd, &p->ips[i].addr); + io_read_buf(b, &p->ips[i].afi, sizeof(enum afi)); + io_read_buf(b, &p->ips[i].maxlength, sizeof(size_t)); + io_read_buf(b, &p->ips[i].min, sizeof(p->ips[i].min)); + io_read_buf(b, &p->ips[i].max, sizeof(p->ips[i].max)); + ip_addr_read(b, &p->ips[i].addr); } - io_str_read(fd, &p->aia); - io_str_read(fd, &p->aki); - io_str_read(fd, &p->ski); - io_str_read(fd, &p->tal); + io_read_str(b, &p->aia); + io_read_str(b, &p->aki); + io_read_str(b, &p->ski); + io_read_str(b, &p->tal); assert(p->aia && p->aki && p->ski && p->tal); return p; diff --git a/usr.sbin/rpki-client/rrdp.c b/usr.sbin/rpki-client/rrdp.c index e1bb138fc84..31c061073f8 100644 --- a/usr.sbin/rpki-client/rrdp.c +++ b/usr.sbin/rpki-client/rrdp.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rrdp.c,v 1.12 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: rrdp.c,v 1.13 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2020 Nils Fisher * Copyright (c) 2021 Claudio Jeker @@ -378,32 +378,37 @@ rrdp_finished(struct rrdp *s) static void rrdp_input_handler(int fd) { + static struct ibuf *inbuf; char *local, *notify, *session_id, *last_mod; + struct ibuf *b; struct rrdp *s; enum rrdp_msg type; enum http_result res; long long serial; - size_t id, size; - int infd, ok; + size_t id; + int ok; + + b = io_buf_recvfd(fd, &inbuf); + if (b == NULL) + return; - infd = io_recvfd(fd, &size, sizeof(size)); - io_simple_read(fd, &type, sizeof(type)); - io_simple_read(fd, &id, sizeof(id)); + io_read_buf(b, &type, sizeof(type)); + io_read_buf(b, &id, sizeof(id)); switch (type) { case RRDP_START: - io_str_read(fd, &local); - io_str_read(fd, ¬ify); - io_str_read(fd, &session_id); - io_simple_read(fd, &serial, sizeof(serial)); - io_str_read(fd, &last_mod); - if (infd != -1) - errx(1, "received unexpected fd %d", infd); + io_read_str(b, &local); + io_read_str(b, ¬ify); + io_read_str(b, &session_id); + io_read_buf(b, &serial, sizeof(serial)); + io_read_str(b, &last_mod); + if (b->fd != -1) + errx(1, "received unexpected fd"); s = rrdp_new(id, local, notify, session_id, serial, last_mod); break; case RRDP_HTTP_INI: - if (infd == -1) + if (b->fd == -1) errx(1, "expected fd not received"); s = rrdp_get(id); if (s == NULL) @@ -411,13 +416,13 @@ rrdp_input_handler(int fd) if (s->state != RRDP_STATE_WAIT) errx(1, "%s: bad internal state", s->local); - s->infd = infd; + s->infd = b->fd; s->state = RRDP_STATE_PARSE; break; case RRDP_HTTP_FIN: - io_simple_read(fd, &res, sizeof(res)); - io_str_read(fd, &last_mod); - if (infd != -1) + io_read_buf(b, &res, sizeof(res)); + io_read_str(b, &last_mod); + if (b->fd != -1) errx(1, "received unexpected fd"); s = rrdp_get(id); @@ -435,12 +440,11 @@ rrdp_input_handler(int fd) s = rrdp_get(id); if (s == NULL) errx(1, "rrdp session %zu does not exist", id); - if (infd != -1) - errx(1, "received unexpected fd %d", infd); - io_simple_read(fd, &ok, sizeof(ok)); - if (ok != 1) { + if (b->fd != -1) + errx(1, "received unexpected fd"); + io_read_buf(b, &ok, sizeof(ok)); + if (ok != 1) s->file_failed++; - } s->file_pending--; if (s->file_pending == 0) rrdp_finished(s); @@ -448,6 +452,7 @@ rrdp_input_handler(int fd) default: errx(1, "unexpected message %d", type); } + ibuf_free(b); } static void @@ -558,14 +563,12 @@ proc_rrdp(int fd) if (pfds[0].revents & POLLHUP) break; if (pfds[0].revents & POLLOUT) { - io_socket_nonblocking(fd); switch (msgbuf_write(&msgq)) { case 0: errx(1, "write: connection closed"); case -1: err(1, "write"); } - io_socket_blocking(fd); } if (pfds[0].revents & POLLIN) rrdp_input_handler(fd); diff --git a/usr.sbin/rpki-client/rsync.c b/usr.sbin/rpki-client/rsync.c index 6b115447cdb..839250b03e7 100644 --- a/usr.sbin/rpki-client/rsync.c +++ b/usr.sbin/rpki-client/rsync.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rsync.c,v 1.26 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: rsync.c,v 1.27 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -120,6 +120,7 @@ proc_rsync(char *prog, char *bind_addr, int fd) int rc = 0; struct pollfd pfd; struct msgbuf msgq; + struct ibuf *b, *inbuf = NULL; sigset_t mask, oldmask; struct rsyncproc *ids = NULL; @@ -178,7 +179,7 @@ proc_rsync(char *prog, char *bind_addr, int fd) for (;;) { char *uri = NULL, *dst = NULL; - size_t id, size; + size_t id; pid_t pid; int st; @@ -198,7 +199,6 @@ proc_rsync(char *prog, char *bind_addr, int fd) */ while ((pid = waitpid(WAIT_ANY, &st, WNOHANG)) > 0) { - struct ibuf *b; int ok = 1; for (i = 0; i < idsz; i++) @@ -247,11 +247,17 @@ proc_rsync(char *prog, char *bind_addr, int fd) if (!(pfd.revents & POLLIN)) continue; + b = io_buf_read(fd, &inbuf); + if (b == NULL) + continue; + /* Read host and module. */ - io_simple_read(fd, &size, sizeof(size)); - io_simple_read(fd, &id, sizeof(id)); - io_str_read(fd, &dst); - io_str_read(fd, &uri); + io_read_buf(b, &id, sizeof(id)); + io_read_str(b, &dst); + io_read_str(b, &uri); + + ibuf_free(b); + assert(dst); assert(uri); diff --git a/usr.sbin/rpki-client/tal.c b/usr.sbin/rpki-client/tal.c index 1ce936e1c8c..b6a0890f82b 100644 --- a/usr.sbin/rpki-client/tal.c +++ b/usr.sbin/rpki-client/tal.c @@ -1,4 +1,4 @@ -/* $OpenBSD: tal.c,v 1.30 2021/04/01 06:43:23 claudio Exp $ */ +/* $OpenBSD: tal.c,v 1.31 2021/10/23 16:06:04 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons * @@ -284,7 +284,7 @@ tal_buffer(struct ibuf *b, const struct tal *p) * A returned pointer must be freed with tal_free(). */ struct tal * -tal_read(int fd) +tal_read(struct ibuf *b) { size_t i; struct tal *p; @@ -292,18 +292,18 @@ tal_read(int fd) if ((p = calloc(1, sizeof(struct tal))) == NULL) err(1, NULL); - io_buf_read_alloc(fd, (void **)&p->pkey, &p->pkeysz); + io_read_buf_alloc(b, (void **)&p->pkey, &p->pkeysz); + io_read_str(b, &p->descr); + io_read_buf(b, &p->urisz, sizeof(size_t)); assert(p->pkeysz > 0); - io_str_read(fd, &p->descr); assert(p->descr); - io_simple_read(fd, &p->urisz, sizeof(size_t)); assert(p->urisz > 0); if ((p->uri = calloc(p->urisz, sizeof(char *))) == NULL) err(1, NULL); for (i = 0; i < p->urisz; i++) { - io_str_read(fd, &p->uri[i]); + io_read_str(b, &p->uri[i]); assert(p->uri[i]); }