-/* $OpenBSD: control.c,v 1.117 2024/04/22 09:36:04 claudio Exp $ */
+/* $OpenBSD: control.c,v 1.118 2024/08/20 11:59:39 claudio Exp $ */
/*
* Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org>
p->stats.prefix_sent_eor = stats.prefix_sent_eor;
p->stats.pending_update = stats.pending_update;
p->stats.pending_withdraw = stats.pending_withdraw;
+ p->stats.msg_queue_len = msgbuf_queuelen(&p->wbuf);
return imsg_compose(&c->imsgbuf, type, 0, pid, -1,
p, sizeof(*p));
-/* $OpenBSD: rde.c,v 1.626 2024/08/14 19:09:51 claudio Exp $ */
+/* $OpenBSD: rde.c,v 1.627 2024/08/20 11:59:39 claudio Exp $ */
/*
* Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org>
if (i >= pfd_elms)
fatalx("poll pfd too small");
- if (mctx->mrt.wbuf.queued) {
+ if (msgbuf_queuelen(&mctx->mrt.wbuf) > 0) {
pfd[i].fd = mctx->mrt.wbuf.fd;
pfd[i].events = POLLOUT;
i++;
{
struct mrt *mrt = arg;
- return (mrt->wbuf.queued > SESS_MSG_LOW_MARK);
+ return (msgbuf_queuelen(&mrt->wbuf) > SESS_MSG_LOW_MARK);
}
static void
-/* $OpenBSD: rtr_proto.c,v 1.38 2024/08/12 09:04:23 claudio Exp $ */
+/* $OpenBSD: rtr_proto.c,v 1.39 2024/08/20 11:59:39 claudio Exp $ */
/*
* Copyright (c) 2020 Claudio Jeker <claudio@openbsd.org>
rtr_fsm(rs, RTR_EVNT_CON_CLOSE);
return;
}
- if (pfd->revents & POLLOUT && rs->w.queued) {
+ if (pfd->revents & POLLOUT && msgbuf_queuelen(&rs->w) > 0) {
if ((error = ibuf_write(&rs->w)) == -1) {
if (errno != EAGAIN) {
log_warn("rtr %s: write error", log_rtr(rs));
}
if (error == 0)
rtr_fsm(rs, RTR_EVNT_CON_CLOSE);
- if (rs->w.queued == 0 && rs->state == RTR_STATE_ERROR)
+ if (rs->state == RTR_STATE_ERROR &&
+ msgbuf_queuelen(&rs->w) == 0)
rtr_fsm(rs, RTR_EVNT_CON_CLOSE);
}
if (pfd->revents & POLLIN) {
pfd->fd = rs->fd;
pfd->events = 0;
- if (rs->w.queued)
+ if (msgbuf_queuelen(&rs->w) > 0)
pfd->events |= POLLOUT;
if (rs->state >= RTR_STATE_ESTABLISHED)
pfd->events |= POLLIN;
-/* $OpenBSD: session.c,v 1.480 2024/06/10 12:51:25 claudio Exp $ */
+/* $OpenBSD: session.c,v 1.481 2024/08/20 11:59:39 claudio Exp $ */
/*
* Copyright (c) 2003, 2004, 2005 Henning Brauer <henning@openbsd.org>
free(m);
continue;
}
- if (m->wbuf.queued)
+ if (msgbuf_queuelen(&m->wbuf) > 0)
mrt_cnt++;
}
/* are we waiting for a write? */
events = POLLIN;
- if (p->wbuf.queued > 0 || p->state == STATE_CONNECT)
+ if (msgbuf_queuelen(&p->wbuf) > 0 ||
+ p->state == STATE_CONNECT)
events |= POLLOUT;
/* is there still work to do? */
if (p->rpending && p->rbuf && p->rbuf->wpos)
idx_peers = i;
LIST_FOREACH(m, &mrthead, entry)
- if (m->wbuf.queued) {
+ if (msgbuf_queuelen(&m->wbuf) > 0) {
pfd[i].fd = m->wbuf.fd;
pfd[i].events = POLLOUT;
mrt_l[i - idx_peers] = m;
* try to write out what's buffered (maybe a notification),
* don't bother if it fails
*/
- if (peer->state >= STATE_OPENSENT && peer->wbuf.queued)
+ if (peer->state >= STATE_OPENSENT &&
+ msgbuf_queuelen(&peer->wbuf) > 0)
msgbuf_write(&peer->wbuf);
/*
}
ibuf_close(&p->wbuf, msg->buf);
- if (!p->throttled && p->wbuf.queued > SESS_MSG_HIGH_MARK) {
+ if (!p->throttled && msgbuf_queuelen(&p->wbuf) > SESS_MSG_HIGH_MARK) {
if (imsg_rde(IMSG_XOFF, p->conf.id, NULL, 0) == -1)
log_peer_warn(&p->conf, "imsg_compose XOFF");
else
return (1);
}
- if (pfd->revents & POLLOUT && p->wbuf.queued) {
+ if (pfd->revents & POLLOUT && msgbuf_queuelen(&p->wbuf) > 0) {
if ((error = msgbuf_write(&p->wbuf)) <= 0 && errno != EAGAIN) {
if (error == 0)
log_peer_warnx(&p->conf, "Connection closed");
}
p->stats.last_write = getmonotime();
start_timer_sendholdtime(p);
- if (p->throttled && p->wbuf.queued < SESS_MSG_LOW_MARK) {
+ if (p->throttled &&
+ msgbuf_queuelen(&p->wbuf) < SESS_MSG_LOW_MARK) {
if (imsg_rde(IMSG_XON, p->conf.id, NULL, 0) == -1)
log_peer_warn(&p->conf, "imsg_compose XON");
else
-/* $OpenBSD: session.h,v 1.171 2024/08/12 09:04:23 claudio Exp $ */
+/* $OpenBSD: session.h,v 1.172 2024/08/20 11:59:39 claudio Exp $ */
/*
* Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org>
time_t last_updown;
time_t last_read;
time_t last_write;
+ uint32_t msg_queue_len;
uint32_t prefix_cnt;
uint32_t prefix_out_cnt;
uint32_t pending_update;