in rpki-client. For now it is off by default.
All XML processing is done in its own process with minimal pledge rights.
It uses the already present https process to fetch the xml files and uses
the master porcess to handle the file IO into the repositories.
RRDP data is stored in the cache under ./rrdp/ and the first directory
is the SHA256 hash of the notify URI.
Fetching snapshots and deltas works to bring the cache up to date.
If something goes wrong rpki-client will fall back to rsync.
RRDP was implemented by Nils Fisher and integrated into rpki-client by myself.
"Time to get it in" deraadt@
-# $OpenBSD: Makefile,v 1.20 2021/04/01 06:43:23 claudio Exp $
+# $OpenBSD: Makefile,v 1.21 2021/04/01 16:04:48 claudio Exp $
PROG= rpki-client
SRCS= as.c cert.c cms.c crl.c encoding.c gbr.c http.c io.c ip.c log.c \
main.c mft.c mkdir.c output.c output-bgpd.c output-bird.c \
- output-csv.c output-json.c parser.c roa.c rsync.c tal.c validate.c \
+ output-csv.c output-json.c parser.c repo.c roa.c rrdp.c rrdp_delta.c \
+ rrdp_notification.c rrdp_snapshot.c rsync.c tal.c validate.c \
x509.c
MAN= rpki-client.8
-LDADD+= -ltls -lssl -lcrypto -lutil
-DPADD+= ${LIBTLS} ${LIBSSL} ${LIBCRYPTO} ${LIBUTIL}
+LDADD+= -lexpat -ltls -lssl -lcrypto -lutil
+DPADD+= ${LIBEXPAT} ${LIBTLS} ${LIBSSL} ${LIBCRYPTO} ${LIBUTIL}
CFLAGS+= -Wall -I${.CURDIR}
CFLAGS+= -Wstrict-prototypes -Wmissing-prototypes
-/* $OpenBSD: extern.h,v 1.61 2021/04/01 06:53:49 claudio Exp $ */
+/* $OpenBSD: extern.h,v 1.62 2021/04/01 16:04:48 claudio Exp $ */
/*
* Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
*
HTTP_NOT_MOD, /* 304 Not Modified */
};
+/*
+ * Message types for communication with RRDP process.
+ */
+enum rrdp_msg {
+ RRDP_START,
+ RRDP_SESSION,
+ RRDP_FILE,
+ RRDP_END,
+ RRDP_HTTP_REQ,
+ RRDP_HTTP_INI,
+ RRDP_HTTP_FIN
+};
+
+/*
+ * RRDP session state, needed to pickup at the right spot on next run.
+ */
+struct rrdp_session {
+ char *last_mod;
+ char *session_id;
+ long long serial;
+};
+
+/*
+ * File types used in RRDP_FILE messages.
+ */
+enum publish_type {
+ PUB_ADD,
+ PUB_UPD,
+ PUB_DEL,
+};
+
/*
* An entity (MFT, ROA, certificate, etc.) that needs to be downloaded
* and parsed.
};
TAILQ_HEAD(entityq, entity);
+struct repo;
+struct filepath;
+RB_HEAD(filepath_tree, filepath);
+
+
/*
* Statistics collected during run-time.
*/
size_t roas_fail; /* failing syntactic parse */
size_t roas_invalid; /* invalid resources */
size_t repos; /* repositories */
+ size_t rsync_repos; /* synced rsync repositories */
+ size_t rsync_fails; /* failed rsync repositories */
+ size_t http_repos; /* synced http repositories */
+ size_t http_fails; /* failed http repositories */
+ size_t rrdp_repos; /* synced rrdp repositories */
+ size_t rrdp_fails; /* failed rrdp repositories */
size_t crls; /* revocation lists */
size_t gbrs; /* ghostbuster records */
size_t vrps; /* total number of vrps */
/* Routines for RPKI entities. */
+int base64_decode(const unsigned char *, unsigned char **,
+ size_t *);
void tal_buffer(struct ibuf *, const struct tal *);
void tal_free(struct tal *);
struct tal *tal_parse(const char *, char *);
/* Parser-specific */
void entity_free(struct entity *);
void entity_read_req(int fd, struct entity *);
+void entityq_flush(struct entityq *, struct repo *);
void proc_parser(int) __attribute__((noreturn));
/* Rsync-specific. */
char *rsync_base_uri(const char *);
void proc_rsync(char *, char *, int) __attribute__((noreturn));
-/* Http-specific. */
+/* HTTP and RRDP processes. */
void proc_http(char *, int);
+void proc_rrdp(int);
+
+/* Repository handling */
+int filepath_add(struct filepath_tree *, char *);
+void rrdp_save_state(size_t, struct rrdp_session *);
+int rrdp_handle_file(size_t, enum publish_type, char *,
+ char *, size_t, char *, size_t);
+char *repo_filename(const struct repo *, const char *);
+struct repo *ta_lookup(struct tal *);
+struct repo *repo_lookup(const char *, const char *);
+int repo_queued(struct repo *, struct entity *);
+void repo_cleanup(struct filepath_tree *);
+void repo_free(void);
+
+void rsync_finish(size_t, int);
+void http_finish(size_t, enum http_result, const char *);
+void rrdp_finish(size_t, int);
+
+void rsync_fetch(size_t, const char *, const char *);
+void http_fetch(size_t, const char *, const char *, int);
+void rrdp_fetch(size_t, const char *, const char *,
+ struct rrdp_session *);
+void rrdp_http_done(size_t, enum http_result, const char *);
+
/* Logging (though really used for OpenSSL errors). */
-/* $OpenBSD: main.c,v 1.128 2021/04/01 06:53:49 claudio Exp $ */
+/* $OpenBSD: main.c,v 1.129 2021/04/01 16:04:48 claudio Exp $ */
/*
* Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
*
#include <sys/queue.h>
#include <sys/socket.h>
#include <sys/resource.h>
-#include <sys/stat.h>
#include <sys/tree.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <dirent.h>
#include <fcntl.h>
#include <fnmatch.h>
-#include <fts.h>
#include <poll.h>
#include <pwd.h>
#include <stdio.h>
*/
#define TALSZ_MAX 8
-/*
- * An rsync repository.
- */
-#define REPO_MAX_URI 2
-struct repo {
- SLIST_ENTRY(repo) entry;
- char *repouri; /* CA repository base URI */
- char *local; /* local path name */
- char *temp; /* temporary file / dir */
- char *uris[REPO_MAX_URI]; /* URIs to fetch from */
- struct entityq queue; /* files waiting for this repo */
- size_t id; /* identifier (array index) */
- int uriidx; /* which URI is fetched */
- int loaded; /* whether loaded or not */
-};
-
size_t entity_queue;
int timeout = 60*60;
volatile sig_atomic_t killme;
void suicide(int sig);
-/*
- * Table of all known repositories.
- */
-SLIST_HEAD(, repo) repos = SLIST_HEAD_INITIALIZER(repos);
-size_t repoid;
-
-/*
- * Database of all file path accessed during a run.
- */
-struct filepath {
- RB_ENTRY(filepath) entry;
- char *file;
-};
-
-static inline int
-filepathcmp(struct filepath *a, struct filepath *b)
-{
- return strcmp(a->file, b->file);
-}
-
-RB_HEAD(filepath_tree, filepath);
-RB_PROTOTYPE(filepath_tree, filepath, entry, filepathcmp);
-
static struct filepath_tree fpt = RB_INITIALIZER(&fpt);
-static struct msgbuf procq, rsyncq, httpq;
+static struct msgbuf procq, rsyncq, httpq, rrdpq;
static int cachefd, outdirfd;
const char *bird_tablename = "ROAS";
int verbose;
int noop;
+int rrdpon;
struct stats stats;
-static void repo_fetch(struct repo *);
-static char *ta_filename(const struct repo *, int);
-
/*
* Log a message to stderr if and only if "verbose" is non-zero.
* This uses the err(3) functionality.
}
}
-/*
- * Functions to lookup which files have been accessed during computation.
- */
-static int
-filepath_add(char *file)
-{
- struct filepath *fp;
-
- if ((fp = malloc(sizeof(*fp))) == NULL)
- err(1, NULL);
- if ((fp->file = strdup(file)) == NULL)
- err(1, NULL);
-
- if (RB_INSERT(filepath_tree, &fpt, fp) != NULL) {
- /* already in the tree */
- free(fp->file);
- free(fp);
- return 0;
- }
-
- return 1;
-}
-
-static int
-filepath_exists(char *file)
-{
- struct filepath needle;
-
- needle.file = file;
- return RB_FIND(filepath_tree, &fpt, &needle) != NULL;
-}
-
-/*
- * Return true if a filepath entry exists that starts with path.
- */
-static int
-filepath_dir_exists(char *path)
-{
- struct filepath needle;
- struct filepath *res;
-
- needle.file = path;
- res = RB_NFIND(filepath_tree, &fpt, &needle);
- while (res != NULL && strstr(res->file, path) == res->file) {
- /* make sure that filepath acctually is in that path */
- if (res->file[strlen(path)] == '/')
- return 1;
- res = RB_NEXT(filepath_tree, &fpt, res);
- }
- return 0;
-}
-
-RB_GENERATE(filepath_tree, filepath, entry, filepathcmp);
-
void
entity_free(struct entity *ent)
{
{
struct ibuf *b;
+ if (filepath_add(&fpt, ent->file) == 0) {
+ warnx("%s: File already visited", ent->file);
+ return;
+ }
+
if ((b = ibuf_dynamic(sizeof(*ent), UINT_MAX)) == NULL)
err(1, NULL);
io_simple_buffer(b, &ent->type, sizeof(ent->type));
* Scan through all queued requests and see which ones are in the given
* repo, then flush those into the parser process.
*/
-static void
-entityq_flush(struct repo *repo)
+void
+entityq_flush(struct entityq *q, struct repo *rp)
{
struct entity *p, *np;
- TAILQ_FOREACH_SAFE(p, &repo->queue, entries, np) {
+ TAILQ_FOREACH_SAFE(p, q, entries, np) {
+ /*
+ * XXX fixup path here since the repo may change
+ * during load because of fallback. In that case
+ * the file path changes as well since RRDP and RSYNC
+ * can not share a common repo.
+ */
+ char *file = p->file;
+ p->file = repo_filename(rp, file);
+ if (p->file == NULL)
+ err(1, "can't construct repo filename");
+ free(file);
+
entity_write_req(p);
- TAILQ_REMOVE(&repo->queue, p, entries);
+ TAILQ_REMOVE(q, p, entries);
entity_free(p);
}
}
{
struct entity *p;
- if (filepath_add(file) == 0) {
- warnx("%s: File already visited", file);
- return;
- }
-
if ((p = calloc(1, sizeof(struct entity))) == NULL)
err(1, NULL);
* been loaded else enqueue it for later.
*/
- if (rp == NULL || rp->loaded) {
+ if (rp == NULL || !repo_queued(rp, p)) {
+ /*
+ * XXX fixup path here since for repo path the
+ * file path has not yet been fixed here.
+ * This is a quick way to make this work but in
+ * the long run repos need to be passed to the parser.
+ */
+ if (rp != NULL) {
+ file = p->file;
+ p->file = repo_filename(rp, file);
+ if (p->file == NULL)
+ err(1, "can't construct repo filename from %s",
+ file);
+ free(file);
+ }
entity_write_req(p);
entity_free(p);
- } else
- TAILQ_INSERT_TAIL(&rp->queue, p, entries);
+ }
}
-/*
- * Allocate and insert a new repository.
- */
-static struct repo *
-repo_alloc(void)
+static void
+rrdp_file_resp(size_t id, int ok)
{
- struct repo *rp;
+ enum rrdp_msg type = RRDP_FILE;
+ struct ibuf *b;
- if ((rp = calloc(1, sizeof(*rp))) == NULL)
+ if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL)
err(1, NULL);
-
- rp->id = ++repoid;
- TAILQ_INIT(&rp->queue);
- SLIST_INSERT_HEAD(&repos, rp, entry);
-
- return rp;
-}
-
-static struct repo *
-repo_find(size_t id)
-{
- struct repo *rp;
-
- SLIST_FOREACH(rp, &repos, entry)
- if (id == rp->id)
- break;
- return rp;
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &id, sizeof(id));
+ io_simple_buffer(b, &ok, sizeof(ok));
+ ibuf_close(&rrdpq, b);
}
-static void
-http_ta_fetch(struct repo *rp)
+void
+rrdp_fetch(size_t id, const char *uri, const char *local,
+ struct rrdp_session *s)
{
- struct ibuf *b;
- int filefd;
-
- rp->temp = ta_filename(rp, 1);
-
- filefd = mkostemp(rp->temp, O_CLOEXEC);
- if (filefd == -1)
- err(1, "mkostemp: %s", rp->temp);
- if (fchmod(filefd, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) == -1)
- warn("fchmod: %s", rp->temp);
+ enum rrdp_msg type = RRDP_START;
+ struct ibuf *b;
if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
err(1, NULL);
- io_simple_buffer(b, &rp->id, sizeof(rp->id));
- io_str_buffer(b, rp->uris[rp->uriidx]);
- /* TODO last modified time */
- io_str_buffer(b, NULL);
- /* pass file as fd */
- b->fd = filefd;
- ibuf_close(&httpq, b);
-}
-
-static int
-http_done(struct repo *rp, enum http_result res)
-{
- if (rp->repouri == NULL) {
- /* Move downloaded TA file into place, or unlink on failure. */
- if (res == HTTP_OK) {
- char *file;
-
- file = ta_filename(rp, 0);
- if (renameat(cachefd, rp->temp, cachefd, file) == -1)
- warn("rename to %s", file);
- } else {
- if (unlinkat(cachefd, rp->temp, 0) == -1)
- warn("unlink %s", rp->temp);
- }
- free(rp->temp);
- rp->temp = NULL;
- }
-
- if (res == HTTP_OK)
- logx("%s: loaded from network", rp->local);
- else if (rp->uriidx < REPO_MAX_URI - 1 &&
- rp->uris[rp->uriidx + 1] != NULL) {
- logx("%s: load from network failed, retry", rp->local);
-
- rp->uriidx++;
- repo_fetch(rp);
- return 0;
- } else
- logx("%s: load from network failed, "
- "fallback to cache", rp->local);
-
- return 1;
-}
-
-static void
-repo_fetch(struct repo *rp)
-{
- struct ibuf *b;
-
- if (noop) {
- rp->loaded = 1;
- logx("%s: using cache", rp->local);
- stats.repos++;
- /* there is nothing in the queue so no need to flush */
- return;
- }
-
- /*
- * Create destination location.
- * Build up the tree to this point.
- */
-
- if (mkpath(rp->local) == -1)
- err(1, "%s", rp->local);
-
- logx("%s: pulling from %s", rp->local, rp->uris[rp->uriidx]);
-
- if (strncasecmp(rp->uris[rp->uriidx], "rsync://", 8) == 0) {
- if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
- err(1, NULL);
- io_simple_buffer(b, &rp->id, sizeof(rp->id));
- io_str_buffer(b, rp->local);
- io_str_buffer(b, rp->uris[rp->uriidx]);
- ibuf_close(&rsyncq, b);
- } else {
- /*
- * Two cases for https. TA files load directly while
- * for RRDP XML files are downloaded and parsed to build
- * the repo. TA repos have a NULL repouri.
- */
- if (rp->repouri == NULL) {
- http_ta_fetch(rp);
- }
- }
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &id, sizeof(id));
+ io_str_buffer(b, local);
+ io_str_buffer(b, uri);
+ io_str_buffer(b, s->session_id);
+ io_simple_buffer(b, &s->serial, sizeof(s->serial));
+ io_str_buffer(b, s->last_mod);
+ ibuf_close(&rrdpq, b);
}
/*
- * Look up a trust anchor, queueing it for download if not found.
+ * Request a repository sync via rsync URI to directory local.
*/
-static struct repo *
-ta_lookup(const struct tal *tal)
+void
+rsync_fetch(size_t id, const char *uri, const char *local)
{
- struct repo *rp;
- char *local;
- size_t i, j;
+ struct ibuf *b;
- if (asprintf(&local, "ta/%s", tal->descr) == -1)
+ if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
err(1, NULL);
-
- /* Look up in repository table. (Lookup should actually fail here) */
- SLIST_FOREACH(rp, &repos, entry) {
- if (strcmp(rp->local, local) != 0)
- continue;
- free(local);
- return rp;
- }
-
- rp = repo_alloc();
- rp->local = local;
- for (i = 0, j = 0; i < tal->urisz && j < 2; i++) {
- if ((rp->uris[j++] = strdup(tal->uri[i])) == NULL)
- err(1, NULL);
- }
- if (j == 0)
- errx(1, "TAL %s has no URI", tal->descr);
-
- repo_fetch(rp);
- return rp;
+ io_simple_buffer(b, &id, sizeof(id));
+ io_str_buffer(b, local);
+ io_str_buffer(b, uri);
+ ibuf_close(&rsyncq, b);
}
/*
- * Look up a repository, queueing it for discovery if not found.
+ * Request a file from a https uri, data is written to the file descriptor fd.
*/
-static struct repo *
-repo_lookup(const char *uri)
+void
+http_fetch(size_t id, const char *uri, const char *last_mod, int fd)
{
- char *local, *repo;
- struct repo *rp;
-
- if ((repo = rsync_base_uri(uri)) == NULL)
- return NULL;
-
- /* Look up in repository table. */
- SLIST_FOREACH(rp, &repos, entry) {
- if (rp->repouri == NULL ||
- strcmp(rp->repouri, repo) != 0)
- continue;
- free(repo);
- return rp;
- }
+ struct ibuf *b;
- rp = repo_alloc();
- rp->repouri = repo;
- local = strchr(repo, ':') + strlen("://");
- if (asprintf(&rp->local, "rsync/%s", local) == -1)
- err(1, NULL);
- if ((rp->uris[0] = strdup(repo)) == NULL)
+ if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
err(1, NULL);
-
- repo_fetch(rp);
- return rp;
+ io_simple_buffer(b, &id, sizeof(id));
+ io_str_buffer(b, uri);
+ io_str_buffer(b, last_mod);
+ /* pass file as fd */
+ b->fd = fd;
+ ibuf_close(&httpq, b);
}
-static char *
-ta_filename(const struct repo *repo, int temp)
+/*
+ * Request some XML file on behalf of the rrdp parser.
+ * Create a pipe and pass the pipe endpoints to the http and rrdp process.
+ */
+static void
+rrdp_http_fetch(size_t id, const char *uri, const char *last_mod)
{
- const char *file;
- char *nfile;
+ enum rrdp_msg type = RRDP_HTTP_INI;
+ struct ibuf *b;
+ int pi[2];
- /* does not matter which URI, all end with same filename */
- file = strrchr(repo->uris[0], '/');
- assert(file);
+ if (pipe2(pi, O_CLOEXEC | O_NONBLOCK) == -1)
+ err(1, "pipe");
- if (asprintf(&nfile, "%s%s%s", repo->local, file,
- temp ? ".XXXXXXXX": "") == -1)
+ if ((b = ibuf_open(sizeof(type) + sizeof(id))) == NULL)
err(1, NULL);
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &id, sizeof(id));
+ b->fd = pi[0];
+ ibuf_close(&rrdpq, b);
- return nfile;
+ http_fetch(id, uri, last_mod, pi[1]);
}
-/*
- * Build local file name base on the URI and the repo info.
- */
-static char *
-repo_filename(const struct repo *repo, const char *uri)
+void
+rrdp_http_done(size_t id, enum http_result res, const char *last_mod)
{
- char *nfile;
-
- if (strstr(uri, repo->repouri) != uri)
- errx(1, "%s: URI outside of repository", uri);
- uri += strlen(repo->repouri) + 1; /* skip base and '/' */
+ enum rrdp_msg type = RRDP_HTTP_FIN;
+ struct ibuf *b;
- if (asprintf(&nfile, "%s/%s", repo->local, uri) == -1)
+ /* RRDP request, relay response over to the rrdp process */
+ if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
err(1, NULL);
- return nfile;
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &id, sizeof(id));
+ io_simple_buffer(b, &res, sizeof(res));
+ io_str_buffer(b, last_mod);
+ ibuf_close(&rrdpq, b);
}
/*
* Add URIs (CER) from a TAL file, RFC 8630.
*/
static void
-queue_add_from_tal(const struct tal *tal)
+queue_add_from_tal(struct tal *tal)
{
- char *nfile;
struct repo *repo;
assert(tal->urisz);
/* Look up the repository. */
repo = ta_lookup(tal);
- nfile = ta_filename(repo, 0);
- entityq_add(nfile, RTYPE_CER, repo, tal->pkey,
+ entityq_add(NULL, RTYPE_CER, repo, tal->pkey,
tal->pkeysz, tal->descr);
}
struct repo *repo;
char *nfile;
- repo = repo_lookup(cert->mft);
- if (repo == NULL) /* bad repository URI */
+ repo = repo_lookup(cert->repo, rrdpon ? cert->notify : NULL);
+ if (repo == NULL) {
+ warnx("%s: repository lookup failed", cert->repo);
return;
+ }
- nfile = repo_filename(repo, cert->mft);
-
+ if ((nfile = strdup(cert->mft)) == NULL)
+ err(1, NULL);
entityq_add(nfile, RTYPE_MFT, repo, NULL, 0, NULL);
}
return s;
}
-static char **
-add_to_del(char **del, size_t *dsz, char *file)
-{
- size_t i = *dsz;
-
- del = reallocarray(del, i + 1, sizeof(*del));
- if (del == NULL)
- err(1, NULL);
- if ((del[i] = strdup(file)) == NULL)
- err(1, NULL);
- *dsz = i + 1;
- return del;
-}
-
-static void
-repo_cleanup(void)
-{
- size_t i, delsz = 0, dirsz = 0;
- char *argv[3], **del = NULL, **dir = NULL;
- FTS *fts;
- FTSENT *e;
-
- argv[0] = "ta";
- argv[1] = "rsync";
- argv[2] = NULL;
- if ((fts = fts_open(argv, FTS_PHYSICAL | FTS_NOSTAT, NULL)) == NULL)
- err(1, "fts_open");
- errno = 0;
- while ((e = fts_read(fts)) != NULL) {
- switch (e->fts_info) {
- case FTS_NSOK:
- if (!filepath_exists(e->fts_path))
- del = add_to_del(del, &delsz,
- e->fts_path);
- break;
- case FTS_D:
- break;
- case FTS_DP:
- if (!filepath_dir_exists(e->fts_path))
- dir = add_to_del(dir, &dirsz,
- e->fts_path);
- break;
- case FTS_SL:
- case FTS_SLNONE:
- warnx("symlink %s", e->fts_path);
- del = add_to_del(del, &delsz, e->fts_path);
- break;
- case FTS_NS:
- case FTS_ERR:
- warnx("fts_read %s: %s", e->fts_path,
- strerror(e->fts_errno));
- break;
- default:
- warnx("unhandled[%x] %s", e->fts_info,
- e->fts_path);
- break;
- }
-
- errno = 0;
- }
- if (errno)
- err(1, "fts_read");
- if (fts_close(fts) == -1)
- err(1, "fts_close");
-
- for (i = 0; i < delsz; i++) {
- if (unlink(del[i]) == -1)
- warn("unlink %s", del[i]);
- if (verbose > 1)
- logx("deleted %s", del[i]);
- free(del[i]);
- }
- free(del);
- stats.del_files = delsz;
-
- for (i = 0; i < dirsz; i++) {
- if (rmdir(dir[i]) == -1)
- warn("rmdir %s", dir[i]);
- if (verbose > 1)
- logx("deleted dir %s", dir[i]);
- free(dir[i]);
- }
- free(dir);
- stats.del_dirs = dirsz;
-}
-
void
suicide(int sig __attribute__((unused)))
{
}
-#define NPFD 3
+#define NPFD 4
int
main(int argc, char *argv[])
{
- int rc = 1, c, st, proc, rsync, http, ok,
+ int rc = 1, c, st, proc, rsync, http, rrdp, ok,
fl = SOCK_STREAM | SOCK_CLOEXEC;
size_t i, id, outsz = 0, talsz = 0;
- pid_t procpid, rsyncpid, httppid;
+ pid_t procpid, rsyncpid, httppid, rrdppid;
int fd[2];
struct pollfd pfd[NPFD];
struct msgbuf *queues[NPFD];
struct roa **out = NULL;
- struct repo *rp;
char *rsync_prog = "openrsync";
char *bind_addr = NULL;
const char *cachedir = NULL, *outputdir = NULL;
setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) == -1 ||
setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid) == -1)
err(1, "unable to revoke privs");
-
}
cachedir = RPKI_PATH_BASE_DIR;
outputdir = RPKI_PATH_OUT_DIR;
"proc exec unveil", NULL) == -1)
err(1, "pledge");
- while ((c = getopt(argc, argv, "b:Bcd:e:jnos:t:T:vV")) != -1)
+ while ((c = getopt(argc, argv, "b:Bcd:e:jnorRs:t:T:vV")) != -1)
switch (c) {
case 'b':
bind_addr = optarg;
case 'o':
outformats |= FORMAT_OPENBGPD;
break;
+ case 'R':
+ rrdpon = 0;
+ break;
+ case 'r':
+ rrdpon = 1;
+ break;
case 's':
timeout = strtonum(optarg, 0, 24*60*60, &errs);
if (errs)
httppid = -1;
}
+ /*
+ * Create a process that will process RRDP.
+ * The rrdp process requires the http process to fetch the various
+ * XML files and does this via the main process.
+ */
+
+ if (!noop && rrdpon) {
+ if (socketpair(AF_UNIX, fl, 0, fd) == -1)
+ err(1, "socketpair");
+ if ((rrdppid = fork()) == -1)
+ err(1, "fork");
+
+ if (rrdppid == 0) {
+ close(proc);
+ close(rsync);
+ close(http);
+ close(fd[1]);
+
+ /* change working directory to the cache directory */
+ if (fchdir(cachefd) == -1)
+ err(1, "fchdir");
+
+ if (pledge("stdio recvfd", NULL) == -1)
+ err(1, "pledge");
+
+ proc_rrdp(fd[0]);
+ /* NOTREACHED */
+ }
+
+ close(fd[0]);
+ rrdp = fd[1];
+ } else
+ rrdp = -1;
+
+ /* TODO unveil chachedir and outputdir, no other access allowed */
if (pledge("stdio rpath wpath cpath fattr sendfd", NULL) == -1)
err(1, "pledge");
msgbuf_init(&procq);
msgbuf_init(&rsyncq);
msgbuf_init(&httpq);
+ msgbuf_init(&rrdpq);
procq.fd = proc;
rsyncq.fd = rsync;
httpq.fd = http;
+ rrdpq.fd = rrdp;
/*
* The main process drives the top-down scan to leaf ROAs using
queues[1] = &procq;
pfd[2].fd = http;
queues[2] = &httpq;
+ pfd[3].fd = rrdp;
+ queues[3] = &rrdpq;
/*
* Prime the process with our TAL file.
if (pfd[i].revents & POLLHUP)
errx(1, "poll[%zu]: hangup", i);
if (pfd[i].revents & POLLOUT) {
+ /*
+ * XXX work around deadlocks because of
+ * blocking read vs non-blocking writes.
+ */
+ if (i > 1)
+ io_socket_nonblocking(pfd[i].fd);
switch (msgbuf_write(queues[i])) {
case 0:
errx(1, "write[%zu]: "
case -1:
err(1, "write[%zu]", i);
}
+ if (i > 1)
+ io_socket_blocking(pfd[i].fd);
}
}
if ((pfd[0].revents & POLLIN)) {
io_simple_read(rsync, &id, sizeof(id));
io_simple_read(rsync, &ok, sizeof(ok));
- rp = repo_find(id);
- if (rp == NULL)
- errx(1, "unknown repository id: %zu", id);
-
- assert(!rp->loaded);
- if (ok)
- logx("%s: loaded from network", rp->local);
- else
- logx("%s: load from network failed, "
- "fallback to cache", rp->local);
- rp->loaded = 1;
- stats.repos++;
- entityq_flush(rp);
+ rsync_finish(id, ok);
}
if ((pfd[2].revents & POLLIN)) {
io_simple_read(http, &id, sizeof(id));
io_simple_read(http, &res, sizeof(res));
io_str_read(http, &last_mod);
- rp = repo_find(id);
- if (rp == NULL)
- errx(1, "unknown repository id: %zu", id);
-
- assert(!rp->loaded);
- if (http_done(rp, res)) {
- rp->loaded = 1;
- stats.repos++;
- entityq_flush(rp);
- }
+ http_finish(id, res, last_mod);
free(last_mod);
}
+ /*
+ * Handle RRDP requests here.
+ */
+ if ((pfd[3].revents & POLLIN)) {
+ enum rrdp_msg type;
+ enum publish_type pt;
+ struct rrdp_session s;
+ char *uri, *last_mod, *data;
+ char hash[SHA256_DIGEST_LENGTH];
+ size_t dsz;
+
+ io_simple_read(rrdp, &type, sizeof(type));
+ io_simple_read(rrdp, &id, sizeof(id));
+
+ switch (type) {
+ case RRDP_END:
+ io_simple_read(rrdp, &ok, sizeof(ok));
+ rrdp_finish(id, ok);
+ break;
+ case RRDP_HTTP_REQ:
+ io_str_read(rrdp, &uri);
+ io_str_read(rrdp, &last_mod);
+ rrdp_http_fetch(id, uri, last_mod);
+ break;
+ case RRDP_SESSION:
+ io_str_read(rrdp, &s.session_id);
+ io_simple_read(rrdp, &s.serial,
+ sizeof(s.serial));
+ io_str_read(rrdp, &s.last_mod);
+ rrdp_save_state(id, &s);
+ free(s.session_id);
+ free(s.last_mod);
+ break;
+ case RRDP_FILE:
+ io_simple_read(rrdp, &pt, sizeof(pt));
+ if (pt != PUB_ADD)
+ io_simple_read(rrdp, &hash,
+ sizeof(hash));
+ io_str_read(rrdp, &uri);
+ io_buf_read_alloc(rrdp, (void **)&data, &dsz);
+
+ ok = rrdp_handle_file(id, pt, uri,
+ hash, sizeof(hash), data, dsz);
+ rrdp_file_resp(id, ok);
+
+ free(uri);
+ free(data);
+ break;
+ default:
+ errx(1, "unexpected rrdp response");
+ }
+ }
+
/*
* The parser has finished something for us.
* Dequeue these one by one.
close(proc);
close(rsync);
close(http);
+ close(rrdp);
if (waitpid(procpid, &st, 0) == -1)
err(1, "waitpid");
warnx("http process exited abnormally");
rc = 1;
}
+
+ if (rrdpon) {
+ if (waitpid(rrdppid, &st, 0) == -1)
+ err(1, "waitpid");
+ if (!WIFEXITED(st) || WEXITSTATUS(st) != 0) {
+ warnx("rrdp process exited abnormally");
+ rc = 1;
+ }
+ }
}
- repo_cleanup();
+ repo_cleanup(&fpt);
gettimeofday(&now_time, NULL);
timersub(&now_time, &start_time, &stats.elapsed_time);
logx("VRP Entries: %zu (%zu unique)", stats.vrps, stats.uniqs);
/* Memory cleanup. */
- while ((rp = SLIST_FIRST(&repos)) != NULL) {
- SLIST_REMOVE_HEAD(&repos, entry);
- free(rp->repouri);
- free(rp->local);
- free(rp->temp);
- free(rp->uris[0]);
- free(rp->uris[1]);
- free(rp);
- }
+ repo_free();
for (i = 0; i < outsz; i++)
roa_free(out[i]);
usage:
fprintf(stderr,
- "usage: rpki-client [-BcjnoVv] [-b sourceaddr] [-d cachedir]"
+ "usage: rpki-client [-BcjnorRVv] [-b sourceaddr] [-d cachedir]"
" [-e rsync_prog]\n"
" [-s timeout] [-T table] [-t tal]"
" [outputdir]\n");
--- /dev/null
+/* $OpenBSD: repo.c,v 1.1 2021/04/01 16:04:48 claudio Exp $ */
+/*
+ * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
+ * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <assert.h>
+#include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <fts.h>
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <imsg.h>
+
+#include "extern.h"
+
+extern struct stats stats;
+extern int noop;
+
+enum repo_state {
+ REPO_LOADING = 0,
+ REPO_DONE = 1,
+ REPO_FAILED = -1,
+};
+
+/*
+ * A ta, rsync or rrdp repository.
+ * Depending on what is needed the generic repository is backed by
+ * a ta, rsync or rrdp reposityry. Multiple repositories can use the
+ * same backend.
+ */
+struct rrdprepo {
+ SLIST_ENTRY(rrdprepo) entry;
+ char *notifyuri;
+ char *basedir;
+ char *temp;
+ struct filepath_tree added;
+ struct filepath_tree deleted;
+ size_t id;
+ enum repo_state state;
+};
+SLIST_HEAD(, rrdprepo) rrdprepos = SLIST_HEAD_INITIALIZER(rrdprepos);
+
+struct rsyncrepo {
+ SLIST_ENTRY(rsyncrepo) entry;
+ char *repouri;
+ char *basedir;
+ size_t id;
+ enum repo_state state;
+};
+SLIST_HEAD(, rsyncrepo) rsyncrepos = SLIST_HEAD_INITIALIZER(rsyncrepos);
+
+struct tarepo {
+ SLIST_ENTRY(tarepo) entry;
+ char *descr;
+ char *basedir;
+ char *temp;
+ char **uri;
+ size_t urisz;
+ size_t uriidx;
+ size_t id;
+ enum repo_state state;
+};
+SLIST_HEAD(, tarepo) tarepos = SLIST_HEAD_INITIALIZER(tarepos);
+
+struct repo {
+ SLIST_ENTRY(repo) entry;
+ char *repouri; /* CA repository base URI */
+ const struct rrdprepo *rrdp;
+ const struct rsyncrepo *rsync;
+ const struct tarepo *ta;
+ struct entityq queue; /* files waiting for repo */
+ size_t id; /* identifier */
+};
+SLIST_HEAD(, repo) repos = SLIST_HEAD_INITIALIZER(repos);
+
+/* counter for unique repo id */
+size_t repoid;
+
+/*
+ * Database of all file path accessed during a run.
+ */
+struct filepath {
+ RB_ENTRY(filepath) entry;
+ char *file;
+};
+
+static inline int
+filepathcmp(struct filepath *a, struct filepath *b)
+{
+ return strcmp(a->file, b->file);
+}
+
+RB_PROTOTYPE(filepath_tree, filepath, entry, filepathcmp);
+
+/*
+ * Functions to lookup which files have been accessed during computation.
+ */
+int
+filepath_add(struct filepath_tree *tree, char *file)
+{
+ struct filepath *fp;
+
+ if ((fp = malloc(sizeof(*fp))) == NULL)
+ err(1, NULL);
+ if ((fp->file = strdup(file)) == NULL)
+ err(1, NULL);
+
+ if (RB_INSERT(filepath_tree, tree, fp) != NULL) {
+ /* already in the tree */
+ free(fp->file);
+ free(fp);
+ return 0;
+ }
+
+ return 1;
+}
+
+/*
+ * Lookup a file path in the tree and return the object if found or NULL.
+ */
+static struct filepath *
+filepath_find(struct filepath_tree *tree, char *file)
+{
+ struct filepath needle;
+
+ needle.file = file;
+ return RB_FIND(filepath_tree, tree, &needle);
+}
+
+/*
+ * Returns true if file exists in the tree.
+ */
+static int
+filepath_exists(struct filepath_tree *tree, char *file)
+{
+ return filepath_find(tree, file) != NULL;
+}
+
+/*
+ * Return true if a filepath entry exists that starts with path.
+ */
+static int
+filepath_dir_exists(struct filepath_tree *tree, char *path)
+{
+ struct filepath needle;
+ struct filepath *res;
+
+ needle.file = path;
+ res = RB_NFIND(filepath_tree, tree, &needle);
+ while (res != NULL && strstr(res->file, path) == res->file) {
+ /* make sure that filepath acctually is in that path */
+ if (res->file[strlen(path)] == '/')
+ return 1;
+ res = RB_NEXT(filepath_tree, tree, res);
+ }
+ return 0;
+}
+
+/*
+ * Remove entry from tree and free it.
+ */
+static void
+filepath_put(struct filepath_tree *tree, struct filepath *fp)
+{
+ RB_REMOVE(filepath_tree, tree, fp);
+ free((void *)fp->file);
+ free(fp);
+}
+
+/*
+ * Free all elements of a filepath tree.
+ */
+static void
+filepath_free(struct filepath_tree *tree)
+{
+ struct filepath *fp, *nfp;
+
+ RB_FOREACH_SAFE(fp, filepath_tree, tree, nfp)
+ filepath_put(tree, fp);
+}
+
+RB_GENERATE(filepath_tree, filepath, entry, filepathcmp);
+
+/*
+ * Function to hash a string into a unique directory name.
+ * prefixed with dir.
+ */
+static char *
+hash_dir(const char *uri, const char *dir)
+{
+ const char hex[] = "0123456789abcdef";
+ unsigned char m[SHA256_DIGEST_LENGTH];
+ char hash[SHA256_DIGEST_LENGTH * 2 + 1];
+ char *out;
+ size_t i;
+
+ SHA256(uri, strlen(uri), m);
+ for (i = 0; i < SHA256_DIGEST_LENGTH; i++) {
+ hash[i * 2] = hex[m[i] >> 4];
+ hash[i * 2 + 1] = hex[m[i] & 0xf];
+ }
+ hash[SHA256_DIGEST_LENGTH * 2] = '\0';
+
+ asprintf(&out, "%s/%s", dir, hash);
+ return out;
+}
+
+/*
+ * Function to build the directory name based on URI and a directory
+ * as prefix. Skip the proto:// in URI but keep everything else.
+ */
+static char *
+rsync_dir(const char *uri, const char *dir)
+{
+ char *local, *out;
+
+ local = strchr(uri, ':') + strlen("://");
+
+ asprintf(&out, "%s/%s", dir, local);
+ return out;
+}
+
+/*
+ * Function to create all missing directories to a path.
+ * This functions alters the path temporarily.
+ */
+static void
+repo_mkpath(char *file)
+{
+ char *slash;
+
+ /* build directory hierarchy */
+ slash = strrchr(file, '/');
+ assert(slash != NULL);
+ *slash = '\0';
+ if (mkpath(file) == -1)
+ err(1, "%s", file);
+ *slash = '/';
+}
+
+/*
+ * Build TA file name based on the repo info.
+ * If temp is set add Xs for mkostemp.
+ */
+static char *
+ta_filename(const struct tarepo *tr, int temp)
+{
+ const char *file;
+ char *nfile;
+
+ /* does not matter which URI, all end with same filename */
+ file = strrchr(tr->uri[0], '/');
+ assert(file);
+
+ if (asprintf(&nfile, "%s%s%s", tr->basedir, file,
+ temp ? ".XXXXXXXX": "") == -1)
+ err(1, NULL);
+
+ return nfile;
+}
+
+/*
+ * Build local file name base on the URI and the rrdprepo info.
+ */
+static char *
+rrdp_filename(const struct rrdprepo *rr, const char *uri, int temp)
+{
+ char *nfile;
+ char *dir = rr->basedir;
+
+ if (temp)
+ dir = rr->temp;
+
+ if (!valid_uri(uri, strlen(uri), "rsync://")) {
+ warnx("%s: bad URI %s", rr->basedir, uri);
+ return NULL;
+ }
+
+ uri += strlen("rsync://"); /* skip proto */
+ if (asprintf(&nfile, "%s/%s", dir, uri) == -1)
+ err(1, NULL);
+ return nfile;
+}
+
+/*
+ * Build RRDP state file name based on the repo info.
+ * If temp is set add Xs for mkostemp.
+ */
+static char *
+rrdp_state_filename(const struct rrdprepo *rr, int temp)
+{
+ char *nfile;
+
+ if (asprintf(&nfile, "%s/.state%s", rr->basedir,
+ temp ? ".XXXXXXXX": "") == -1)
+ err(1, NULL);
+
+ return nfile;
+}
+
+
+
+static void
+ta_fetch(struct tarepo *tr)
+{
+ int fd;
+
+ logx("ta/%s: pulling from %s", tr->descr, tr->uri[tr->uriidx]);
+
+ if (strncasecmp(tr->uri[tr->uriidx], "rsync://", 8) == 0) {
+ /*
+ * Create destination location.
+ * Build up the tree to this point.
+ */
+ rsync_fetch(tr->id, tr->uri[tr->uriidx], tr->basedir);
+ } else {
+ tr->temp = ta_filename(tr, 1);
+ fd = mkostemp(tr->temp, O_CLOEXEC);
+ if (fd == -1) {
+ err(1, "mkostemp: %s", tr->temp);
+ /* XXX switch to soft fail and restart with next file */
+ }
+ if (fchmod(fd, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) == -1)
+ warn("fchmod: %s", tr->temp);
+
+ http_fetch(tr->id, tr->uri[tr->uriidx], NULL, fd);
+ }
+}
+
+static struct tarepo *
+ta_get(struct tal *tal)
+{
+ struct tarepo *tr;
+
+ /* no need to look for possible other repo */
+
+ if (tal->urisz == 0)
+ errx(1, "TAL %s has no URI", tal->descr);
+
+ if ((tr = calloc(1, sizeof(*tr))) == NULL)
+ err(1, NULL);
+ tr->id = ++repoid;
+ SLIST_INSERT_HEAD(&tarepos, tr, entry);
+
+ if ((tr->descr = strdup(tal->descr)) == NULL)
+ err(1, NULL);
+ if (asprintf(&tr->basedir, "ta/%s", tal->descr) == -1)
+ err(1, NULL);
+
+ /* steal URI infromation from TAL */
+ tr->urisz = tal->urisz;
+ tr->uri = tal->uri;
+ tal->urisz = 0;
+ tal->uri = NULL;
+
+ /* create base directory */
+ if (mkpath(tr->basedir) == -1)
+ err(1, "%s", tr->basedir);
+
+ if (noop) {
+ tr->state = REPO_DONE;
+ logx("ta/%s: using cache", tr->descr);
+ /* there is nothing in the queue so no need to flush */
+ } else
+ ta_fetch(tr);
+
+ return tr;
+}
+
+static struct tarepo *
+ta_find(size_t id)
+{
+ struct tarepo *tr;
+
+ SLIST_FOREACH(tr, &tarepos, entry)
+ if (id == tr->id)
+ break;
+ return tr;
+}
+
+static void
+ta_free(void)
+{
+ struct tarepo *tr;
+
+ while ((tr = SLIST_FIRST(&tarepos)) != NULL) {
+ SLIST_REMOVE_HEAD(&tarepos, entry);
+ free(tr->descr);
+ free(tr->basedir);
+ free(tr->temp);
+ free(tr->uri);
+ free(tr);
+ }
+}
+
+static struct rsyncrepo *
+rsync_get(const char *uri)
+{
+ struct rsyncrepo *rr;
+ char *repo;
+
+ if ((repo = rsync_base_uri(uri)) == NULL)
+ errx(1, "bad caRepository URI: %s", uri);
+
+ SLIST_FOREACH(rr, &rsyncrepos, entry)
+ if (strcmp(rr->repouri, repo) == 0) {
+ free(repo);
+ return rr;
+ }
+
+ if ((rr = calloc(1, sizeof(*rr))) == NULL)
+ err(1, NULL);
+
+ rr->id = ++repoid;
+ SLIST_INSERT_HEAD(&rsyncrepos, rr, entry);
+
+ rr->repouri = repo;
+ rr->basedir = rsync_dir(repo, "rsync");
+
+ /* create base directory */
+ if (mkpath(rr->basedir) == -1)
+ err(1, "%s", rr->basedir);
+
+ if (noop) {
+ rr->state = REPO_DONE;
+ logx("%s: using cache", rr->basedir);
+ /* there is nothing in the queue so no need to flush */
+ } else {
+ logx("%s: pulling from %s", rr->basedir, rr->repouri);
+ rsync_fetch(rr->id, rr->repouri, rr->basedir);
+ }
+
+ return rr;
+}
+
+static struct rsyncrepo *
+rsync_find(size_t id)
+{
+ struct rsyncrepo *rr;
+
+ SLIST_FOREACH(rr, &rsyncrepos, entry)
+ if (id == rr->id)
+ break;
+ return rr;
+}
+
+static void
+rsync_free(void)
+{
+ struct rsyncrepo *rr;
+
+ while ((rr = SLIST_FIRST(&rsyncrepos)) != NULL) {
+ SLIST_REMOVE_HEAD(&rsyncrepos, entry);
+ free(rr->repouri);
+ free(rr->basedir);
+ free(rr);
+ }
+}
+
+static void rrdprepo_fetch(struct rrdprepo *);
+
+static struct rrdprepo *
+rrdp_get(const char *uri)
+{
+ struct rrdprepo *rr;
+
+ SLIST_FOREACH(rr, &rrdprepos, entry)
+ if (strcmp(rr->notifyuri, uri) == 0) {
+ if (rr->state == REPO_FAILED)
+ return NULL;
+ return rr;
+ }
+
+ if ((rr = calloc(1, sizeof(*rr))) == NULL)
+ err(1, NULL);
+
+ rr->id = ++repoid;
+ SLIST_INSERT_HEAD(&rrdprepos, rr, entry);
+
+ if ((rr->notifyuri = strdup(uri)) == NULL)
+ err(1, NULL);
+ rr->basedir = hash_dir(uri, "rrdp");
+
+ RB_INIT(&rr->added);
+ RB_INIT(&rr->deleted);
+
+ /* create base directory */
+ if (mkpath(rr->basedir) == -1)
+ err(1, "%s", rr->basedir);
+
+ if (noop) {
+ rr->state = REPO_DONE;
+ logx("%s: using cache", rr->notifyuri);
+ /* there is nothing in the queue so no need to flush */
+ } else {
+ logx("%s: pulling from %s", rr->notifyuri, "network");
+ rrdprepo_fetch(rr);
+ }
+
+ return rr;
+}
+
+static struct rrdprepo *
+rrdp_find(size_t id)
+{
+ struct rrdprepo *rr;
+
+ SLIST_FOREACH(rr, &rrdprepos, entry)
+ if (id == rr->id)
+ break;
+ return rr;
+}
+
+static void
+rrdp_free(void)
+{
+ struct rrdprepo *rr;
+
+ while ((rr = SLIST_FIRST(&rrdprepos)) != NULL) {
+ SLIST_REMOVE_HEAD(&rrdprepos, entry);
+
+ free(rr->notifyuri);
+ free(rr->basedir);
+ free(rr->temp);
+
+ filepath_free(&rr->added);
+ filepath_free(&rr->deleted);
+
+ free(rr);
+ }
+}
+
+static int
+rrdp_basedir(const char *dir)
+{
+ struct rrdprepo *rr;
+
+ SLIST_FOREACH(rr, &rrdprepos, entry)
+ if (strcmp(dir, rr->basedir) == 0)
+ return 1;
+
+ return 0;
+}
+
+/*
+ * Allocate and insert a new repository.
+ */
+static struct repo *
+repo_alloc(void)
+{
+ struct repo *rp;
+
+ if ((rp = calloc(1, sizeof(*rp))) == NULL)
+ err(1, NULL);
+
+ rp->id = ++repoid;
+ TAILQ_INIT(&rp->queue);
+ SLIST_INSERT_HEAD(&repos, rp, entry);
+
+ stats.repos++;
+ return rp;
+}
+
+/*
+ * Return the state of a repository.
+ */
+static enum repo_state
+repo_state(struct repo *rp)
+{
+ if (rp->ta)
+ return rp->ta->state;
+ if (rp->rrdp)
+ return rp->rrdp->state;
+ if (rp->rsync)
+ return rp->rsync->state;
+ errx(1, "%s: bad repo", rp->repouri);
+}
+
+#if 0
+/*
+ * locate a repository by ID.
+ */
+static struct repo *
+repo_find(size_t id)
+{
+ struct repo *rp;
+
+ SLIST_FOREACH(rp, &repos, entry)
+ if (id == rp->id)
+ break;
+ return rp;
+}
+#endif
+
+
+/*
+ * Parse the RRDP state file if it exists and set the session struct
+ * based on that information.
+ */
+static void
+rrdp_parse_state(const struct rrdprepo *rr, struct rrdp_session *state)
+{
+ FILE *f;
+ int fd, ln = 0;
+ const char *errstr;
+ char *line = NULL, *file;
+ size_t len = 0;
+ ssize_t n;
+
+ file = rrdp_state_filename(rr, 0);
+ if ((fd = open(file, O_RDONLY)) == -1) {
+ if (errno != ENOENT)
+ warn("%s: open state file", rr->basedir);
+ free(file);
+ return;
+ }
+ free(file);
+ f = fdopen(fd, "r");
+ if (f == NULL)
+ err(1, "fdopen");
+
+ while ((n = getline(&line, &len, f)) != -1) {
+ if (line[n - 1] == '\n')
+ line[n - 1] = '\0';
+ switch (ln) {
+ case 0:
+ if ((state->session_id = strdup(line)) == NULL)
+ err(1, NULL);
+ break;
+ case 1:
+ state->serial = strtonum(line, 1, LLONG_MAX, &errstr);
+ if (errstr)
+ goto fail;
+ break;
+ case 2:
+ if ((state->last_mod = strdup(line)) == NULL)
+ err(1, NULL);
+ break;
+ default:
+ goto fail;
+ }
+ ln++;
+ }
+
+ free(line);
+ if (ferror(f))
+ goto fail;
+ fclose(f);
+ return;
+
+fail:
+ warnx("%s: troubles reading state file", rr->basedir);
+ fclose(f);
+ free(state->session_id);
+ free(state->last_mod);
+ memset(state, 0, sizeof(*state));
+}
+
+/*
+ * Carefully write the RRDP session state file back.
+ */
+void
+rrdp_save_state(size_t id, struct rrdp_session *state)
+{
+ struct rrdprepo *rr;
+ char *temp, *file;
+ FILE *f;
+ int fd;
+
+ rr = rrdp_find(id);
+ if (rr == NULL)
+ errx(1, "non-existant rrdp repo %zu", id);
+
+ file = rrdp_state_filename(rr, 0);
+ temp = rrdp_state_filename(rr, 1);
+
+ if ((fd = mkostemp(temp, O_CLOEXEC)) == -1)
+ err(1, "%s: mkostemp: %s", rr->basedir, temp);
+ (void) fchmod(fd, 0644);
+ f = fdopen(fd, "w");
+ if (f == NULL)
+ err(1, "fdopen");
+
+ /* write session state file out */
+ if (fprintf(f, "%s\n%lld\n", state->session_id,
+ state->serial) < 0) {
+ fclose(f);
+ goto fail;
+ }
+ if (state->last_mod != NULL) {
+ if (fprintf(f, "%s\n", state->last_mod) < 0) {
+ fclose(f);
+ goto fail;
+ }
+ }
+ if (fclose(f) != 0)
+ goto fail;
+
+ if (rename(temp, file) == -1)
+ warn("%s: rename state file", rr->basedir);
+
+ free(temp);
+ free(file);
+ return;
+
+fail:
+ warnx("%s: failed to save state", rr->basedir);
+ unlink(temp);
+ free(temp);
+ free(file);
+}
+
+int
+rrdp_handle_file(size_t id, enum publish_type pt, char *uri,
+ char *hash, size_t hlen, char *data, size_t dlen)
+{
+ struct rrdprepo *rr;
+ struct filepath *fp;
+ ssize_t s;
+ char *fn;
+ int fd;
+
+ rr = rrdp_find(id);
+ if (rr == NULL)
+ errx(1, "non-existant rrdp repo %zu", id);
+
+ /* belt and suspenders */
+ if (!valid_uri(uri, strlen(uri), "rsync://")) {
+ warnx("%s: bad file URI", rr->basedir);
+ return 0;
+ }
+
+ if (pt == PUB_UPD || pt == PUB_DEL) {
+ if (filepath_exists(&rr->deleted, uri)) {
+ warnx("%s: already deleted", uri);
+ return 0;
+ }
+ fp = filepath_find(&rr->added, uri);
+ if (fp == NULL) {
+ if ((fn = rrdp_filename(rr, uri, 0)) == NULL)
+ return 0;
+ } else {
+ filepath_put(&rr->added, fp);
+ if ((fn = rrdp_filename(rr, uri, 1)) == NULL)
+ return 0;
+ }
+ if (!valid_filehash(fn, hash, hlen)) {
+ warnx("%s: bad message digest", fn);
+ free(fn);
+ return 0;
+ }
+ free(fn);
+ }
+
+ if (pt == PUB_DEL) {
+ filepath_add(&rr->deleted, uri);
+ } else {
+ /* add new file to temp dir */
+ if ((fn = rrdp_filename(rr, uri, 1)) == NULL)
+ return 0;
+
+ repo_mkpath(fn);
+ fd = open(fn, O_WRONLY|O_CREAT|O_TRUNC, 0644);
+ if (fd == -1) {
+ warn("open %s", fn);
+ free(fn);
+ return 0;
+ }
+
+ if ((s = write(fd, data, dlen)) == -1) {
+ warn("write %s", fn);
+ free(fn);
+ close(fd);
+ return 0;
+ }
+ close(fd);
+ if ((size_t)s != dlen) {
+ warnx("short write %s", fn);
+ free(fn);
+ return 0;
+ }
+ free(fn);
+ filepath_add(&rr->added, uri);
+ }
+
+ return 1;
+}
+
+/*
+ * Initiate a RRDP sync, create the required temporary directory and
+ * parse a possible state file before sending the request to the RRDP process.
+ */
+static void
+rrdprepo_fetch(struct rrdprepo *rr)
+{
+ struct rrdp_session state = { 0 };
+
+ if (asprintf(&rr->temp, "%s.XXXXXXXX", rr->basedir) == -1)
+ err(1, NULL);
+ if (mkdtemp(rr->temp) == NULL)
+ err(1, "mkdtemp %s", rr->temp);
+
+ rrdp_parse_state(rr, &state);
+ rrdp_fetch(rr->id, rr->notifyuri, rr->notifyuri, &state);
+
+ free(state.session_id);
+ free(state.last_mod);
+}
+
+static void
+rrdp_merge_repo(struct rrdprepo *rr)
+{
+ struct filepath *fp, *nfp;
+ char *fn, *rfn;
+
+ /* XXX should delay deletes */
+ RB_FOREACH_SAFE(fp, filepath_tree, &rr->deleted, nfp) {
+ if ((fn = rrdp_filename(rr, fp->file, 0)) != NULL) {
+ if (unlink(fn) == -1)
+ warn("%s: unlink", fn);
+ free(fn);
+ }
+ filepath_put(&rr->deleted, fp);
+ }
+
+ RB_FOREACH_SAFE(fp, filepath_tree, &rr->added, nfp) {
+ if ((fn = rrdp_filename(rr, fp->file, 1)) != NULL &&
+ (rfn = rrdp_filename(rr, fp->file, 0)) != NULL) {
+ repo_mkpath(rfn);
+ if (rename(fn, rfn) == -1)
+ warn("%s: link", rfn);
+ free(rfn);
+ }
+ free(fn);
+ filepath_put(&rr->added, fp);
+ }
+}
+
+static void
+rrdp_clean_temp(struct rrdprepo *rr)
+{
+ struct filepath *fp, *nfp;
+ char *fn;
+
+ filepath_free(&rr->deleted);
+
+ RB_FOREACH_SAFE(fp, filepath_tree, &rr->added, nfp) {
+ if ((fn = rrdp_filename(rr, fp->file, 1)) != NULL) {
+ if (unlink(fn) == -1)
+ warn("%s: unlink", fn);
+ free(fn);
+ }
+ filepath_put(&rr->added, fp);
+ }
+}
+
+/*
+ * RSYNC sync finished, either with or without success.
+ */
+void
+rsync_finish(size_t id, int ok)
+{
+ struct rsyncrepo *rr;
+ struct tarepo *tr;
+ struct repo *rp;
+
+ tr = ta_find(id);
+ if (tr != NULL) {
+ if (ok) {
+ logx("ta/%s: loaded from network", tr->descr);
+ stats.rsync_repos++;
+ tr->state = REPO_DONE;
+ } else if (++tr->uriidx < tr->urisz) {
+ logx("ta/%s: load from network failed, retry",
+ tr->descr);
+ ta_fetch(tr);
+ return;
+ } else {
+ logx("ta/%s: load from network failed, "
+ "fallback to cache", tr->descr);
+ stats.rsync_fails++;
+ tr->state = REPO_FAILED;
+ }
+ SLIST_FOREACH(rp, &repos, entry)
+ if (rp->ta == tr)
+ entityq_flush(&rp->queue, rp);
+
+ return;
+ }
+
+ rr = rsync_find(id);
+ if (rr == NULL)
+ errx(1, "unknown rsync repo %zu", id);
+
+ if (ok) {
+ logx("%s: loaded from network", rr->basedir);
+ stats.rsync_repos++;
+ rr->state = REPO_DONE;
+ } else {
+ logx("%s: load from network failed, fallback to cache",
+ rr->basedir);
+ stats.rsync_fails++;
+ rr->state = REPO_FAILED;
+ }
+
+ SLIST_FOREACH(rp, &repos, entry)
+ if (rp->rsync == rr)
+ entityq_flush(&rp->queue, rp);
+}
+
+/*
+ * RRDP sync finshed, either with or without success.
+ */
+void
+rrdp_finish(size_t id, int ok)
+{
+ struct rrdprepo *rr;
+ struct repo *rp;
+
+ rr = rrdp_find(id);
+ if (rr == NULL)
+ errx(1, "unknown RRDP repo %zu", id);
+
+ if (ok) {
+ rrdp_merge_repo(rr);
+ logx("%s: loaded from network", rr->notifyuri);
+ rr->state = REPO_DONE;
+ stats.rrdp_repos++;
+ SLIST_FOREACH(rp, &repos, entry)
+ if (rp->rrdp == rr)
+ entityq_flush(&rp->queue, rp);
+ } else {
+ rrdp_clean_temp(rr);
+ stats.rrdp_fails++;
+ rr->state = REPO_FAILED;
+ logx("%s: load from network failed, fallback to rsync",
+ rr->notifyuri);
+ SLIST_FOREACH(rp, &repos, entry)
+ if (rp->rrdp == rr) {
+ rp->rrdp = NULL;
+ rp->rsync = rsync_get(rp->repouri);
+ /* need to check if it was already loaded */
+ if (repo_state(rp) != REPO_LOADING)
+ entityq_flush(&rp->queue, rp);
+ }
+ }
+}
+
+/*
+ * Handle responses from the http process. For TA file, either rename
+ * or delete the temporary file. For RRDP requests relay the request
+ * over to the rrdp process.
+ */
+void
+http_finish(size_t id, enum http_result res, const char *last_mod)
+{
+ struct tarepo *tr;
+ struct repo *rp;
+
+ tr = ta_find(id);
+ if (tr == NULL) {
+ /* not a TA fetch therefor RRDP */
+ rrdp_http_done(id, res, last_mod);
+ return;
+ }
+
+ /* Move downloaded TA file into place, or unlink on failure. */
+ if (res == HTTP_OK) {
+ char *file;
+
+ file = ta_filename(tr, 0);
+ if (rename(tr->temp, file) == -1)
+ warn("rename to %s", file);
+ free(file);
+
+ logx("ta/%s: loaded from network", tr->descr);
+ tr->state = REPO_DONE;
+ stats.http_repos++;
+ } else {
+ if (unlink(tr->temp) == -1)
+ warn("unlink %s", tr->temp);
+
+ if (++tr->uriidx < tr->urisz) {
+ logx("ta/%s: load from network failed, retry",
+ tr->descr);
+ ta_fetch(tr);
+ return;
+ }
+
+ tr->state = REPO_FAILED;
+ logx("ta/%s: load from network failed, "
+ "fallback to cache", tr->descr);
+ }
+
+ SLIST_FOREACH(rp, &repos, entry)
+ if (rp->ta == tr)
+ entityq_flush(&rp->queue, rp);
+}
+
+
+
+/*
+ * Look up a trust anchor, queueing it for download if not found.
+ */
+struct repo *
+ta_lookup(struct tal *tal)
+{
+ struct repo *rp;
+
+ /* Look up in repository table. (Lookup should actually fail here) */
+ SLIST_FOREACH(rp, &repos, entry) {
+ if (strcmp(rp->repouri, tal->descr) == 0)
+ return rp;
+ }
+
+ rp = repo_alloc();
+ if ((rp->repouri = strdup(tal->descr)) == NULL)
+ err(1, NULL);
+ rp->ta = ta_get(tal);
+
+ return rp;
+}
+
+/*
+ * Look up a repository, queueing it for discovery if not found.
+ */
+struct repo *
+repo_lookup(const char *uri, const char *notify)
+{
+ struct repo *rp;
+
+ /* Look up in repository table. */
+ SLIST_FOREACH(rp, &repos, entry) {
+ if (strcmp(rp->repouri, uri) != 0)
+ continue;
+ return rp;
+ }
+
+ rp = repo_alloc();
+ if ((rp->repouri = strdup(uri)) == NULL)
+ err(1, NULL);
+
+ /* try RRDP first if available */
+ if (notify != NULL)
+ rp->rrdp = rrdp_get(notify);
+ if (rp->rrdp == NULL)
+ rp->rsync = rsync_get(uri);
+
+ return rp;
+}
+
+/*
+ * Build local file name base on the URI and the repo info.
+ */
+char *
+repo_filename(const struct repo *rp, const char *uri)
+{
+ char *nfile;
+ char *dir, *repouri;
+
+ if (uri == NULL && rp->ta)
+ return ta_filename(rp->ta, 0);
+
+ assert(uri != NULL);
+ if (rp->rrdp)
+ return rrdp_filename(rp->rrdp, uri, 0);
+
+ /* must be rsync */
+ dir = rp->rsync->basedir;
+ repouri = rp->rsync->repouri;
+
+ if (strstr(uri, repouri) != uri) {
+ warnx("%s: URI %s outside of repository", repouri, uri);
+ return NULL;
+ }
+
+ uri += strlen(repouri) + 1; /* skip base and '/' */
+
+ if (asprintf(&nfile, "%s/%s", dir, uri) == -1)
+ err(1, NULL);
+ return nfile;
+}
+
+int
+repo_queued(struct repo *rp, struct entity *p)
+{
+ if (repo_state(rp) == REPO_LOADING) {
+ TAILQ_INSERT_TAIL(&rp->queue, p, entries);
+ return 1;
+ }
+ return 0;
+}
+
+static char **
+add_to_del(char **del, size_t *dsz, char *file)
+{
+ size_t i = *dsz;
+
+ del = reallocarray(del, i + 1, sizeof(*del));
+ if (del == NULL)
+ err(1, NULL);
+ if ((del[i] = strdup(file)) == NULL)
+ err(1, NULL);
+ *dsz = i + 1;
+ return del;
+}
+void
+repo_cleanup(struct filepath_tree *tree)
+{
+ size_t i, delsz = 0, dirsz = 0;
+ char **del = NULL, **dir = NULL;
+ char *argv[4];
+ FTS *fts;
+ FTSENT *e;
+
+ argv[0] = "ta";
+ argv[1] = "rsync";
+ argv[2] = "rrdp";
+ argv[3] = NULL;
+ if ((fts = fts_open(argv, FTS_PHYSICAL | FTS_NOSTAT, NULL)) == NULL)
+ err(1, "fts_open");
+ errno = 0;
+ while ((e = fts_read(fts)) != NULL) {
+ switch (e->fts_info) {
+ case FTS_NSOK:
+ if (!filepath_exists(tree, e->fts_path))
+ del = add_to_del(del, &delsz,
+ e->fts_path);
+ break;
+ case FTS_D:
+ /* skip rrdp base directories during cleanup */
+ if (rrdp_basedir(e->fts_path))
+ if (fts_set(fts, e, FTS_SKIP) == -1)
+ err(1, "fts_set");
+ break;
+ case FTS_DP:
+ if (!filepath_dir_exists(tree, e->fts_path))
+ dir = add_to_del(dir, &dirsz,
+ e->fts_path);
+ break;
+ case FTS_SL:
+ case FTS_SLNONE:
+ warnx("symlink %s", e->fts_path);
+ del = add_to_del(del, &delsz, e->fts_path);
+ break;
+ case FTS_NS:
+ case FTS_ERR:
+ if (e->fts_errno == ENOENT &&
+ (strcmp(e->fts_path, "rsync") == 0 ||
+ strcmp(e->fts_path, "rrdp") == 0))
+ continue;
+ warnx("fts_read %s: %s", e->fts_path,
+ strerror(e->fts_errno));
+ break;
+ default:
+ warnx("unhandled[%x] %s", e->fts_info,
+ e->fts_path);
+ break;
+ }
+
+ errno = 0;
+ }
+ if (errno)
+ err(1, "fts_read");
+ if (fts_close(fts) == -1)
+ err(1, "fts_close");
+
+ for (i = 0; i < delsz; i++) {
+ if (unlink(del[i]) == -1)
+ warn("unlink %s", del[i]);
+ if (verbose > 1)
+ logx("deleted %s", del[i]);
+ free(del[i]);
+ }
+ free(del);
+ stats.del_files = delsz;
+
+ for (i = 0; i < dirsz; i++) {
+ if (rmdir(dir[i]) == -1)
+ warn("rmdir %s", dir[i]);
+ if (verbose > 1)
+ logx("deleted dir %s", dir[i]);
+ free(dir[i]);
+ }
+ free(dir);
+ stats.del_dirs = dirsz;
+}
+
+void
+repo_free(void)
+{
+ struct repo *rp;
+
+ while ((rp = SLIST_FIRST(&repos)) != NULL) {
+ SLIST_REMOVE_HEAD(&repos, entry);
+ free(rp->repouri);
+ free(rp);
+ }
+
+ ta_free();
+ rrdp_free();
+ rsync_free();
+}
-.\" $OpenBSD: rpki-client.8,v 1.35 2021/03/29 15:04:28 deraadt Exp $
+.\" $OpenBSD: rpki-client.8,v 1.36 2021/04/01 16:04:48 claudio Exp $
.\"
.\" Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
.\"
.\" ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
.\" OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
.\"
-.Dd $Mdocdate: March 29 2021 $
+.Dd $Mdocdate: April 1 2021 $
.Dt RPKI-CLIENT 8
.Os
.Sh NAME
.Nd RPKI validator to support BGP Origin Validation
.Sh SYNOPSIS
.Nm
-.Op Fl BcjnoVv
+.Op Fl BcjnoRrVv
.Op Fl b Ar sourceaddr
.Op Fl d Ar cachedir
.Op Fl e Ar rsync_prog
.Nm
will load all TAL files in
.Pa /etc/rpki .
+.It Fl R
+Disable synchronisation via RRDP.
+.It Fl r
+Enable synchronisation via RRDP.
.It Fl V
Show the version and exit.
.It Fl v
--- /dev/null
+/* $OpenBSD: rrdp.c,v 1.1 2021/04/01 16:04:48 claudio Exp $ */
+/*
+ * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
+ * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+#include <sys/queue.h>
+#include <sys/stat.h>
+
+#include <assert.h>
+#include <ctype.h>
+#include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <poll.h>
+#include <string.h>
+#include <unistd.h>
+#include <imsg.h>
+
+#include <expat.h>
+#include <openssl/sha.h>
+
+#include "extern.h"
+#include "rrdp.h"
+
+#define MAX_SESSIONS 12
+#define READ_BUF_SIZE (32 * 1024)
+
+static struct msgbuf msgq;
+
+#define RRDP_STATE_REQ 0x01
+#define RRDP_STATE_WAIT 0x02
+#define RRDP_STATE_PARSE 0x04
+#define RRDP_STATE_PARSE_ERROR 0x08
+#define RRDP_STATE_PARSE_DONE 0x10
+#define RRDP_STATE_HTTP_DONE 0x20
+#define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
+
+struct rrdp {
+ TAILQ_ENTRY(rrdp) entry;
+ size_t id;
+ char *notifyuri;
+ char *local;
+ char *last_mod;
+
+ struct pollfd *pfd;
+ int infd;
+ int state;
+ unsigned int file_pending;
+ unsigned int file_failed;
+ enum http_result res;
+ enum rrdp_task task;
+
+ char hash[SHA256_DIGEST_LENGTH];
+ SHA256_CTX ctx;
+
+ struct rrdp_session repository;
+ struct rrdp_session current;
+ XML_Parser parser;
+ struct notification_xml *nxml;
+ struct snapshot_xml *sxml;
+ struct delta_xml *dxml;
+};
+
+TAILQ_HEAD(,rrdp) states = TAILQ_HEAD_INITIALIZER(states);
+
+struct publish_xml {
+ char *uri;
+ char *data;
+ char hash[SHA256_DIGEST_LENGTH];
+ int data_length;
+ enum publish_type type;
+};
+
+char *
+xstrdup(const char *s)
+{
+ char *r;
+ if ((r = strdup(s)) == NULL)
+ err(1, "strdup");
+ return r;
+}
+
+/*
+ * Hex decode hexstring into the supplied buffer.
+ * Return 0 on success else -1, if buffer too small or bad encoding.
+ */
+int
+hex_decode(const char *hexstr, char *buf, size_t len)
+{
+ unsigned char ch, r;
+ size_t pos = 0;
+ int i;
+
+ while (*hexstr) {
+ r = 0;
+ for (i = 0; i < 2; i++) {
+ ch = hexstr[i];
+ if (isdigit(ch))
+ ch -= '0';
+ else if (islower(ch))
+ ch -= ('a' - 10);
+ else if (isupper(ch))
+ ch -= ('A' - 10);
+ else
+ return -1;
+ if (ch > 0xf)
+ return -1;
+ r = r << 4 | ch;
+ }
+ if (pos < len)
+ buf[pos++] = r;
+ else
+ return -1;
+
+ hexstr += 2;
+ }
+ return 0;
+}
+
+/*
+ * Report back that a RRDP request finished.
+ * ok should only be set to 1 if the cache is now up-to-date.
+ */
+static void
+rrdp_done(size_t id, int ok)
+{
+ enum rrdp_msg type = RRDP_END;
+ struct ibuf *b;
+
+ if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL)
+ err(1, NULL);
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &id, sizeof(id));
+ io_simple_buffer(b, &ok, sizeof(ok));
+ ibuf_close(&msgq, b);
+}
+
+/*
+ * Request an URI to be fetched via HTTPS.
+ * The main process will respond with a RRDP_HTTP_INI which includes
+ * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
+ * end of the request with the HTTP status code and last modified timestamp.
+ * If the request should not set the If-Modified-Since: header then last_mod
+ * should be set to NULL, else it should point to a proper date string.
+ */
+static void
+rrdp_http_req(size_t id, const char *uri, const char *last_mod)
+{
+ enum rrdp_msg type = RRDP_HTTP_REQ;
+ struct ibuf *b;
+
+ if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
+ err(1, NULL);
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &id, sizeof(id));
+ io_str_buffer(b, uri);
+ io_str_buffer(b, last_mod);
+ ibuf_close(&msgq, b);
+}
+
+/*
+ * Send the session state to the main process so it gets stored.
+ */
+static void
+rrdp_state_send(struct rrdp *s)
+{
+ enum rrdp_msg type = RRDP_SESSION;
+ struct ibuf *b;
+
+ if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
+ err(1, NULL);
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &s->id, sizeof(s->id));
+ io_str_buffer(b, s->current.session_id);
+ io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial));
+ io_str_buffer(b, s->current.last_mod);
+ ibuf_close(&msgq, b);
+}
+
+static struct rrdp *
+rrdp_new(size_t id, char *local, char *notify, char *session_id,
+ long long serial, char *last_mod)
+{
+ struct rrdp *s;
+
+ if ((s = calloc(1, sizeof(*s))) == NULL)
+ err(1, NULL);
+
+ s->infd = -1;
+ s->id = id;
+ s->local = local;
+ s->notifyuri = notify;
+ s->repository.session_id = session_id;
+ s->repository.serial = serial;
+ s->repository.last_mod = last_mod;
+
+ s->state = RRDP_STATE_REQ;
+ if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
+ err(1, "XML_ParserCreate");
+
+ s->nxml = new_notification_xml(s->parser, &s->repository, &s->current);
+
+ TAILQ_INSERT_TAIL(&states, s, entry);
+
+ return s;
+}
+
+static void
+rrdp_free(struct rrdp *s)
+{
+ if (s == NULL)
+ return;
+
+ TAILQ_REMOVE(&states, s, entry);
+
+ free_notification_xml(s->nxml);
+ free_snapshot_xml(s->sxml);
+ free_delta_xml(s->dxml);
+
+ if (s->infd != -1)
+ close(s->infd);
+ if (s->parser)
+ XML_ParserFree(s->parser);
+ free(s->notifyuri);
+ free(s->local);
+ free(s->last_mod);
+ free(s->repository.last_mod);
+ free(s->repository.session_id);
+ free(s->current.last_mod);
+ free(s->current.session_id);
+
+ free(s);
+}
+
+static struct rrdp *
+rrdp_get(size_t id)
+{
+ struct rrdp *s;
+
+ TAILQ_FOREACH(s, &states, entry)
+ if (s->id == id)
+ break;
+ return s;
+}
+
+static void
+rrdp_failed(struct rrdp *s)
+{
+ size_t id = s->id;
+
+ /* reset file state before retrying */
+ s->file_failed = 0;
+
+ /* XXX MUST do some cleanup in the repo here */
+ if (s->task == DELTA) {
+ /* fallback to a snapshot as per RFC8182 */
+ free_delta_xml(s->dxml);
+ s->dxml = NULL;
+ s->sxml = new_snapshot_xml(s->parser, &s->current, s);
+ s->task = SNAPSHOT;
+ s->state = RRDP_STATE_REQ;
+ } else {
+ /*
+ * TODO: update state to track recurring failures
+ * and fall back to rsync after a while.
+ * This should probably happen in the main process.
+ */
+ rrdp_free(s);
+ rrdp_done(id, 0);
+ }
+}
+
+static void
+rrdp_finished(struct rrdp *s)
+{
+ size_t id = s->id;
+
+ /* check if all parts of the process have finished */
+ if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
+ return;
+
+ /* still some files pending */
+ if (s->file_pending > 0)
+ return;
+
+ if (s->state & RRDP_STATE_PARSE_ERROR) {
+ warnx("%s: failed after XML parse error", s->local);
+ rrdp_failed(s);
+ return;
+ }
+
+ if (s->res == HTTP_OK) {
+ /*
+ * Finalize parsing on success to be sure that
+ * all of the XML is correct. Needs to be done here
+ * since the call would most probably fail for non
+ * successful data fetches.
+ */
+ if (XML_Parse(s->parser, NULL, 0, 1) != XML_STATUS_OK) {
+ warnx("%s: XML error at line %lu: %s",
+ s->local,
+ XML_GetCurrentLineNumber(s->parser),
+ XML_ErrorString(XML_GetErrorCode(s->parser))
+ );
+ rrdp_failed(s);
+ return;
+ }
+
+ /* If a file caused an error fail the update */
+ if (s->file_failed > 0) {
+ rrdp_failed(s);
+ return;
+ }
+
+ switch (s->task) {
+ case NOTIFICATION:
+ s->task = notification_done(s->nxml, s->last_mod);
+ s->last_mod = NULL;
+ switch (s->task) {
+ case NOTIFICATION:
+ warnx("%s: repository not modified",
+ s->local);
+ rrdp_state_send(s);
+ rrdp_free(s);
+ rrdp_done(id, 1);
+ break;
+ case SNAPSHOT:
+ warnx("%s: downloading snapshot",
+ s->local);
+ s->sxml = new_snapshot_xml(s->parser,
+ &s->current, s);
+ s->state = RRDP_STATE_REQ;
+ break;
+ case DELTA:
+ warnx("%s: downloading %lld deltas",
+ s->local, s->repository.serial -
+ s->current.serial);
+ s->dxml = new_delta_xml(s->parser,
+ &s->current, s);
+ s->state = RRDP_STATE_REQ;
+ break;
+ }
+ break;
+ case SNAPSHOT:
+ rrdp_state_send(s);
+ rrdp_free(s);
+ rrdp_done(id, 1);
+ break;
+ case DELTA:
+ if (notification_delta_done(s->nxml)) {
+ /* finished */
+ rrdp_state_send(s);
+ rrdp_free(s);
+ rrdp_done(id, 1);
+ } else {
+ /* reset delta parser for next delta */
+ free_delta_xml(s->dxml);
+ s->dxml = new_delta_xml(s->parser,
+ &s->current, s);
+ s->state = RRDP_STATE_REQ;
+ }
+ break;
+ }
+ } else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
+ warnx("%s: notification file not modified", s->local);
+ /* no need to update state file */
+ rrdp_free(s);
+ rrdp_done(id, 1);
+ } else {
+ warnx("%s: HTTP request failed", s->local);
+ rrdp_failed(s);
+ }
+}
+
+static void
+rrdp_input_handler(int fd)
+{
+ char *local, *notify, *session_id, *last_mod;
+ struct rrdp *s;
+ enum rrdp_msg type;
+ enum http_result res;
+ long long serial;
+ size_t id;
+ int infd, ok;
+
+ infd = io_recvfd(fd, &type, sizeof(type));
+ io_simple_read(fd, &id, sizeof(id));
+
+ switch (type) {
+ case RRDP_START:
+ io_str_read(fd, &local);
+ io_str_read(fd, ¬ify);
+ io_str_read(fd, &session_id);
+ io_simple_read(fd, &serial, sizeof(serial));
+ io_str_read(fd, &last_mod);
+ if (infd != -1)
+ errx(1, "received unexpected fd %d", infd);
+
+ s = rrdp_new(id, local, notify, session_id, serial, last_mod);
+ break;
+ case RRDP_HTTP_INI:
+ if (infd == -1)
+ errx(1, "expected fd not received");
+ s = rrdp_get(id);
+ if (s == NULL)
+ errx(1, "rrdp session %zu does not exist", id);
+ if (s->state != RRDP_STATE_WAIT)
+ errx(1, "%s: bad internal state", s->local);
+
+ s->infd = infd;
+ s->state = RRDP_STATE_PARSE;
+ break;
+ case RRDP_HTTP_FIN:
+ io_simple_read(fd, &res, sizeof(res));
+ io_str_read(fd, &last_mod);
+ if (infd != -1)
+ errx(1, "received unexpected fd");
+
+ s = rrdp_get(id);
+ if (s == NULL)
+ errx(1, "rrdp session %zu does not exist", id);
+ if (!(s->state & RRDP_STATE_PARSE))
+ errx(1, "%s: bad internal state", s->local);
+
+ s->res = res;
+ s->last_mod = last_mod;
+ s->state |= RRDP_STATE_HTTP_DONE;
+ rrdp_finished(s);
+ break;
+ case RRDP_FILE:
+ s = rrdp_get(id);
+ if (s == NULL)
+ errx(1, "rrdp session %zu does not exist", id);
+ if (infd != -1)
+ errx(1, "received unexpected fd %d", infd);
+ io_simple_read(fd, &ok, sizeof(ok));
+ if (ok == 0)
+ s->file_failed++;
+ s->file_pending--;
+ if (s->file_pending == 0)
+ rrdp_finished(s);
+ break;
+ default:
+ errx(1, "unexpected message %d", type);
+ }
+}
+
+static void
+rrdp_data_handler(struct rrdp *s)
+{
+ char buf[READ_BUF_SIZE];
+ XML_Parser p = s->parser;
+ ssize_t len;
+
+ len = read(s->infd, buf, sizeof(buf));
+ if (len == -1) {
+ s->state |= RRDP_STATE_PARSE_ERROR;
+ warn("%s: read failure", s->local);
+ return;
+ }
+ if ((s->state & RRDP_STATE_PARSE) == 0)
+ errx(1, "%s: bad parser state", s->local);
+ if (len == 0) {
+ /* parser stage finished */
+ close(s->infd);
+ s->infd = -1;
+
+ if (s->task != NOTIFICATION) {
+ char h[SHA256_DIGEST_LENGTH];
+
+ SHA256_Final(h, &s->ctx);
+ if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
+ s->state |= RRDP_STATE_PARSE_ERROR;
+ warnx("%s: bad message digest", s->local);
+ return;
+ }
+ }
+
+ s->state |= RRDP_STATE_PARSE_DONE;
+ rrdp_finished(s);
+ return;
+ }
+
+ /* parse and maybe hash the bytes just read */
+ if (s->task != NOTIFICATION)
+ SHA256_Update(&s->ctx, buf, len);
+ if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
+ XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
+ s->state |= RRDP_STATE_PARSE_ERROR;
+ warnx("%s: parse error at line %lu: %s", s->local,
+ XML_GetCurrentLineNumber(p),
+ XML_ErrorString(XML_GetErrorCode(p)));
+ }
+}
+
+void
+proc_rrdp(int fd)
+{
+ struct pollfd pfds[MAX_SESSIONS + 1];
+ struct rrdp *s, *ns;
+ size_t i;
+
+ if (pledge("stdio recvfd", NULL) == -1)
+ err(1, "pledge");
+
+ memset(&pfds, 0, sizeof(pfds));
+
+ msgbuf_init(&msgq);
+ msgq.fd = fd;
+
+ for (;;) {
+ i = 1;
+ TAILQ_FOREACH(s, &states, entry) {
+ if (i >= MAX_SESSIONS + 1) {
+ /* not enough sessions, wait for better times */
+ s->pfd = NULL;
+ continue;
+ }
+ /* request new assets when there are free sessions */
+ if (s->state == RRDP_STATE_REQ) {
+ const char *uri;
+ switch (s->task) {
+ case NOTIFICATION:
+ rrdp_http_req(s->id, s->notifyuri,
+ s->repository.last_mod);
+ break;
+ case SNAPSHOT:
+ case DELTA:
+ uri = notification_get_next(s->nxml,
+ s->hash, sizeof(s->hash),
+ s->task);
+ SHA256_Init(&s->ctx);
+ rrdp_http_req(s->id, uri, NULL);
+ break;
+ }
+ s->state = RRDP_STATE_WAIT;
+ }
+ s->pfd = pfds + i++;
+ s->pfd->fd = s->infd;
+ s->pfd->events = POLLIN;
+ }
+
+ /*
+ * Update main fd last.
+ * The previous loop may have enqueue messages.
+ */
+ pfds[0].fd = fd;
+ pfds[0].events = POLLIN;
+ if (msgq.queued)
+ pfds[0].events |= POLLOUT;
+
+ if (poll(pfds, i, INFTIM) == -1)
+ err(1, "poll");
+
+ if (pfds[0].revents & POLLHUP)
+ break;
+ if (pfds[0].revents & POLLOUT) {
+ io_socket_nonblocking(fd);
+ switch (msgbuf_write(&msgq)) {
+ case 0:
+ errx(1, "write: connection closed");
+ case -1:
+ err(1, "write");
+ }
+ io_socket_blocking(fd);
+ }
+ if (pfds[0].revents & POLLIN)
+ rrdp_input_handler(fd);
+
+ TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
+ if (s->pfd == NULL)
+ continue;
+ if (s->pfd->revents & POLLIN)
+ rrdp_data_handler(s);
+ }
+ }
+
+ exit(0);
+}
+
+/*
+ * Both snapshots and deltas use publish_xml to store the publish and
+ * withdraw records. Once all the content is added the request is sent
+ * to the main process where it is processed.
+ */
+struct publish_xml *
+new_publish_xml(enum publish_type type, char *uri, char *hash, size_t hlen)
+{
+ struct publish_xml *pxml;
+
+ if ((pxml = calloc(1, sizeof(*pxml))) == NULL)
+ err(1, "%s", __func__);
+
+ pxml->type = type;
+ pxml->uri = uri;
+ if (hlen > 0) {
+ assert(hlen == sizeof(pxml->hash));
+ memcpy(pxml->hash, hash, hlen);
+ }
+
+ return pxml;
+}
+
+void
+free_publish_xml(struct publish_xml *pxml)
+{
+ if (pxml == NULL)
+ return;
+
+ free(pxml->uri);
+ free(pxml->data);
+ free(pxml);
+}
+
+/*
+ * Add buf to the base64 data string, ensure that this remains a proper
+ * string by NUL-terminating the string.
+ */
+void
+publish_add_content(struct publish_xml *pxml, const char *buf, int length)
+{
+ int new_length;
+
+ /*
+ * optmisiation, this often gets called with '\n' as the
+ * only data... seems wasteful
+ */
+ if (length == 1 && buf[0] == '\n')
+ return;
+
+ /* append content to data */
+ new_length = pxml->data_length + length;
+ pxml->data = realloc(pxml->data, new_length + 1);
+ if (pxml->data == NULL)
+ err(1, "%s", __func__);
+
+ memcpy(pxml->data + pxml->data_length, buf, length);
+ pxml->data[new_length] = '\0';
+ pxml->data_length = new_length;
+}
+
+/*
+ * Base64 decode the data blob and send the file to the main process
+ * where the hash is validated and the file stored in the repository.
+ * Increase the file_pending counter to ensure the RRDP process waits
+ * until all files have been processed before moving to the next stage.
+ * Returns 0 on success or -1 on errors (base64 decode failed).
+ */
+int
+publish_done(struct rrdp *s, struct publish_xml *pxml)
+{
+ enum rrdp_msg type = RRDP_FILE;
+ struct ibuf *b;
+ unsigned char *data = NULL;
+ size_t datasz = 0;
+
+ if (pxml->data_length > 0)
+ if ((base64_decode(pxml->data, &data, &datasz)) == -1)
+ return -1;
+
+ if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
+ err(1, NULL);
+ io_simple_buffer(b, &type, sizeof(type));
+ io_simple_buffer(b, &s->id, sizeof(s->id));
+ io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
+ if (pxml->type != PUB_ADD)
+ io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
+ io_str_buffer(b, pxml->uri);
+ io_buf_buffer(b, data, datasz);
+ ibuf_close(&msgq, b);
+ s->file_pending++;
+
+ free(data);
+ free_publish_xml(pxml);
+ return 0;
+}
--- /dev/null
+#ifndef _RRDPH_
+#define _RRDPH_
+
+#define MAX_VERSION 1
+
+#define log_debuginfo(format, ...) logx(format, ##__VA_ARGS__)
+
+/* save everyone doing this code over and over */
+#define PARSE_FAIL(p, ...) do { \
+ XML_StopParser(p, XML_FALSE); \
+ warnx(__VA_ARGS__); \
+ return; \
+} while(0)
+
+enum rrdp_task {
+ NOTIFICATION,
+ SNAPSHOT,
+ DELTA,
+};
+
+/* rrdp generic */
+char *xstrdup(const char *);
+int hex_decode(const char *, char *, size_t);
+
+/* publish or withdraw element */
+struct rrdp;
+struct publish_xml;
+
+struct publish_xml *new_publish_xml(enum publish_type, char *,
+ char *, size_t);
+void free_publish_xml(struct publish_xml *);
+void publish_add_content(struct publish_xml *,
+ const char *, int);
+int publish_done(struct rrdp *, struct publish_xml *);
+
+/* notification */
+struct notification_xml;
+
+struct notification_xml *new_notification_xml(XML_Parser,
+ struct rrdp_session *, struct rrdp_session *);
+void free_notification_xml(struct notification_xml *);
+enum rrdp_task notification_done(struct notification_xml *,
+ char *);
+const char *notification_get_next(struct notification_xml *,
+ char *, size_t, enum rrdp_task);
+int notification_delta_done(struct notification_xml *);
+void log_notification_xml(struct notification_xml *);
+
+/* snapshot */
+struct snapshot_xml;
+
+struct snapshot_xml *new_snapshot_xml(XML_Parser, struct rrdp_session *,
+ struct rrdp *);
+void free_snapshot_xml(struct snapshot_xml *);
+void log_snapshot_xml(struct snapshot_xml *);
+
+/* delta */
+struct delta_xml;
+
+struct delta_xml *new_delta_xml(XML_Parser, struct rrdp_session *,
+ struct rrdp *);
+void free_delta_xml(struct delta_xml *);
+void log_delta_xml(struct delta_xml *);
+
+#endif /* _RRDPH_ */
--- /dev/null
+/*
+ * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
+ * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <err.h>
+#include <limits.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <expat.h>
+#include <openssl/sha.h>
+
+#include "extern.h"
+#include "rrdp.h"
+
+enum delta_scope {
+ DELTA_SCOPE_NONE,
+ DELTA_SCOPE_DELTA,
+ DELTA_SCOPE_PUBLISH,
+ DELTA_SCOPE_END
+};
+
+struct delta_xml {
+ XML_Parser parser;
+ struct rrdp_session *current;
+ struct rrdp *rrdp;
+ struct publish_xml *pxml;
+ char *session_id;
+ long long serial;
+ int version;
+ enum delta_scope scope;
+};
+
+enum validate_return {
+ VALIDATE_RETURN_NO_FILE,
+ VALIDATE_RETURN_FILE_DEL,
+ VALIDATE_RETURN_HASH_MISMATCH,
+ VALIDATE_RETURN_HASH_MATCH
+};
+
+static void
+start_delta_elem(struct delta_xml *dxml, const char **attr)
+{
+ XML_Parser p = dxml->parser;
+ int has_xmlns = 0;
+ int i;
+
+ if (dxml->scope != DELTA_SCOPE_NONE)
+ PARSE_FAIL(p,
+ "parse failed - entered delta elem unexpectedely");
+ for (i = 0; attr[i]; i += 2) {
+ const char *errstr;
+ if (strcmp("xmlns", attr[i]) == 0) {
+ has_xmlns = 1;
+ continue;
+ }
+ if (strcmp("version", attr[i]) == 0) {
+ dxml->version = strtonum(attr[i + 1],
+ 1, MAX_VERSION, &errstr);
+ if (errstr == NULL)
+ continue;
+ }
+ if (strcmp("session_id", attr[i]) == 0) {
+ dxml->session_id = xstrdup(attr[i + 1]);
+ continue;
+ }
+ if (strcmp("serial", attr[i]) == 0) {
+ dxml->serial = strtonum(attr[i + 1],
+ 1, LLONG_MAX, &errstr);
+ if (errstr == NULL)
+ continue;
+ }
+ PARSE_FAIL(p, "parse failed - non conforming "
+ "attribute found in delta elem");
+ }
+ if (!(has_xmlns && dxml->version && dxml->session_id && dxml->serial))
+ PARSE_FAIL(p, "parse failed - incomplete delta attributes");
+ if (strcmp(dxml->current->session_id, dxml->session_id) != 0)
+ PARSE_FAIL(p, "parse failed - session_id mismatch");
+ if (dxml->current->serial != dxml->serial)
+ PARSE_FAIL(p, "parse failed - serial mismatch");
+
+ dxml->scope = DELTA_SCOPE_DELTA;
+}
+
+static void
+end_delta_elem(struct delta_xml *dxml)
+{
+ XML_Parser p = dxml->parser;
+
+ if (dxml->scope != DELTA_SCOPE_DELTA)
+ PARSE_FAIL(p, "parse failed - exited delta "
+ "elem unexpectedely");
+ dxml->scope = DELTA_SCOPE_END;
+
+}
+
+static void
+start_publish_withdraw_elem(struct delta_xml *dxml, const char **attr,
+ int withdraw)
+{
+ XML_Parser p = dxml->parser;
+ char *uri, hash[SHA256_DIGEST_LENGTH];
+ int i, hasUri = 0, hasHash = 0;
+ enum publish_type pub = PUB_UPD;
+
+ if (dxml->scope != DELTA_SCOPE_DELTA)
+ PARSE_FAIL(p, "parse failed - entered publish/withdraw "
+ "elem unexpectedely");
+ for (i = 0; attr[i]; i += 2) {
+ if (strcmp("uri", attr[i]) == 0 && hasUri++ == 0) {
+ if (valid_uri(attr[i + 1], strlen(attr[i + 1]),
+ "rsync://")) {
+ uri = xstrdup(attr[i + 1]);
+ continue;
+ }
+ }
+ if (strcmp("hash", attr[i]) == 0 && hasHash++ == 0) {
+ if (hex_decode(attr[i + 1], hash, sizeof(hash)) == 0)
+ continue;
+ }
+ PARSE_FAIL(p, "parse failed - non conforming "
+ "attribute found in publish/withdraw elem");
+ }
+ if (hasUri != 1)
+ PARSE_FAIL(p,
+ "parse failed - incomplete publish/withdraw attributes");
+ if (withdraw && hasHash != 1)
+ PARSE_FAIL(p, "parse failed - incomplete withdraw attributes");
+
+ if (withdraw)
+ pub = PUB_DEL;
+ else if (hasHash == 0)
+ pub = PUB_ADD;
+ dxml->pxml = new_publish_xml(pub, uri, hash,
+ hasHash ? sizeof(hash) : 0);
+ dxml->scope = DELTA_SCOPE_PUBLISH;
+}
+
+static void
+end_publish_withdraw_elem(struct delta_xml *dxml, int withdraw)
+{
+ XML_Parser p = dxml->parser;
+
+ if (dxml->scope != DELTA_SCOPE_PUBLISH)
+ PARSE_FAIL(p, "parse failed - exited publish/withdraw "
+ "elem unexpectedely");
+
+ if (publish_done(dxml->rrdp, dxml->pxml) != 0)
+ PARSE_FAIL(p, "parse failed - bad publish/withdraw elem");
+ dxml->pxml = NULL;
+
+ dxml->scope = DELTA_SCOPE_DELTA;
+}
+
+static void
+delta_xml_elem_start(void *data, const char *el, const char **attr)
+{
+ struct delta_xml *dxml = data;
+ XML_Parser p = dxml->parser;
+
+ /*
+ * Can only enter here once as we should have no ways to get back to
+ * NONE scope
+ */
+ if (strcmp("delta", el) == 0)
+ start_delta_elem(dxml, attr);
+ /*
+ * Will enter here multiple times, BUT never nested. will start
+ * collecting character data in that handler
+ * mem is cleared in end block, (TODO or on parse failure)
+ */
+ else if (strcmp("publish", el) == 0)
+ start_publish_withdraw_elem(dxml, attr, 0);
+ else if (strcmp("withdraw", el) == 0)
+ start_publish_withdraw_elem(dxml, attr, 1);
+ else
+ PARSE_FAIL(p, "parse failed - unexpected elem exit found");
+}
+
+static void
+delta_xml_elem_end(void *data, const char *el)
+{
+ struct delta_xml *dxml = data;
+ XML_Parser p = dxml->parser;
+
+ if (strcmp("delta", el) == 0)
+ end_delta_elem(dxml);
+ /*
+ * TODO does this allow <publish></withdraw> or is that caught by basic
+ * xml parsing
+ */
+ else if (strcmp("publish", el) == 0)
+ end_publish_withdraw_elem(dxml, 0);
+ else if (strcmp("withdraw", el) == 0)
+ end_publish_withdraw_elem(dxml, 1);
+ else
+ PARSE_FAIL(p, "parse failed - unexpected elem exit found");
+}
+
+static void
+delta_content_handler(void *data, const char *content, int length)
+{
+ struct delta_xml *dxml = data;
+
+ if (dxml->scope == DELTA_SCOPE_PUBLISH)
+ publish_add_content(dxml->pxml, content, length);
+}
+
+struct delta_xml *
+new_delta_xml(XML_Parser p, struct rrdp_session *rs, struct rrdp *r)
+{
+ struct delta_xml *dxml;
+
+ if ((dxml = calloc(1, sizeof(*dxml))) == NULL)
+ err(1, "%s", __func__);
+ dxml->parser = p;
+ dxml->current = rs;
+ dxml->rrdp = r;
+
+ if (XML_ParserReset(dxml->parser, "US-ASCII") != XML_TRUE)
+ errx(1, "%s: XML_ParserReset failed", __func__);
+
+ XML_SetElementHandler(dxml->parser, delta_xml_elem_start,
+ delta_xml_elem_end);
+ XML_SetCharacterDataHandler(dxml->parser, delta_content_handler);
+ XML_SetUserData(dxml->parser, dxml);
+
+ return dxml;
+}
+
+void
+free_delta_xml(struct delta_xml *dxml)
+{
+ if (dxml == NULL)
+ return;
+
+ free(dxml->session_id);
+ free_publish_xml(dxml->pxml);
+ free(dxml);
+}
+
+void
+log_delta_xml(struct delta_xml *dxml)
+{
+ logx("version: %d", dxml->version);
+ logx("session_id: %s serial: %lld", dxml->session_id, dxml->serial);
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
+ * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/stat.h>
+
+#include <assert.h>
+#include <err.h>
+#include <errno.h>
+#include <limits.h>
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <expat.h>
+#include <openssl/sha.h>
+
+#include "extern.h"
+#include "rrdp.h"
+
+enum notification_scope {
+ NOTIFICATION_SCOPE_START,
+ NOTIFICATION_SCOPE_NOTIFICATION,
+ NOTIFICATION_SCOPE_SNAPSHOT,
+ NOTIFICATION_SCOPE_NOTIFICATION_POST_SNAPSHOT,
+ NOTIFICATION_SCOPE_DELTA,
+ NOTIFICATION_SCOPE_END
+};
+
+struct delta_item {
+ char *uri;
+ char hash[SHA256_DIGEST_LENGTH];
+ long long serial;
+ TAILQ_ENTRY(delta_item) q;
+};
+
+TAILQ_HEAD(delta_q, delta_item);
+
+struct notification_xml {
+ XML_Parser parser;
+ struct rrdp_session *repository;
+ struct rrdp_session *current;
+ char *session_id;
+ char *snapshot_uri;
+ char snapshot_hash[SHA256_DIGEST_LENGTH];
+ struct delta_q delta_q;
+ long long serial;
+ int version;
+ enum notification_scope scope;
+};
+
+static int
+add_delta(struct notification_xml *nxml, const char *uri,
+ const char hash[SHA256_DIGEST_LENGTH], long long serial)
+{
+ struct delta_item *d, *n;
+
+ if ((d = calloc(1, sizeof(struct delta_item))) == NULL)
+ err(1, "%s - calloc", __func__);
+
+ d->serial = serial;
+ d->uri = xstrdup(uri);
+ memcpy(d->hash, hash, sizeof(d->hash));
+
+ /* optimise for a sorted input */
+ n = TAILQ_LAST(&nxml->delta_q, delta_q);
+ if (n == NULL)
+ TAILQ_INSERT_HEAD(&nxml->delta_q, d, q);
+ else if (n->serial < serial)
+ TAILQ_INSERT_TAIL(&nxml->delta_q, d, q);
+ else
+ TAILQ_FOREACH(n, &nxml->delta_q, q) {
+ if (n->serial == serial) {
+ warnx("duplicate delta serial %lld ", serial);
+ free(d);
+ return 0;
+ }
+ if (n->serial > serial) {
+ TAILQ_INSERT_BEFORE(n, d, q);
+ break;
+ }
+ }
+
+ return 1;
+}
+
+static void
+free_delta(struct delta_item *d)
+{
+ free(d->uri);
+ free(d);
+}
+
+static void
+start_notification_elem(struct notification_xml *nxml, const char **attr)
+{
+ XML_Parser p = nxml->parser;
+ int has_xmlns = 0;
+ size_t i;
+
+ if (nxml->scope != NOTIFICATION_SCOPE_START)
+ PARSE_FAIL(p,
+ "parse failed - entered notification elem unexpectedely");
+ for (i = 0; attr[i]; i += 2) {
+ const char *errstr;
+ if (strcmp("xmlns", attr[i]) == 0) {
+ has_xmlns = 1;
+ continue;
+ }
+ if (strcmp("session_id", attr[i]) == 0) {
+ nxml->session_id = xstrdup(attr[i + 1]);
+ continue;
+ }
+ if (strcmp("version", attr[i]) == 0) {
+ nxml->version = strtonum(attr[i + 1],
+ 1, MAX_VERSION, &errstr);
+ if (errstr == NULL)
+ continue;
+ }
+ if (strcmp("serial", attr[i]) == 0) {
+ nxml->serial = strtonum(attr[i + 1],
+ 1, LLONG_MAX, &errstr);
+ if (errstr == NULL)
+ continue;
+ }
+ PARSE_FAIL(p, "parse failed - non conforming "
+ "attribute found in notification elem");
+ }
+ if (!(has_xmlns && nxml->version && nxml->session_id && nxml->serial))
+ PARSE_FAIL(p, "parse failed - incomplete "
+ "notification attributes");
+
+ nxml->scope = NOTIFICATION_SCOPE_NOTIFICATION;
+}
+
+static void
+end_notification_elem(struct notification_xml *nxml)
+{
+ XML_Parser p = nxml->parser;
+
+ if (nxml->scope != NOTIFICATION_SCOPE_NOTIFICATION_POST_SNAPSHOT)
+ PARSE_FAIL(p, "parse failed - exited notification "
+ "elem unexpectedely");
+ nxml->scope = NOTIFICATION_SCOPE_END;
+}
+
+static void
+start_snapshot_elem(struct notification_xml *nxml, const char **attr)
+{
+ XML_Parser p = nxml->parser;
+ int i, hasUri = 0, hasHash = 0;
+
+ if (nxml->scope != NOTIFICATION_SCOPE_NOTIFICATION)
+ PARSE_FAIL(p,
+ "parse failed - entered snapshot elem unexpectedely");
+ for (i = 0; attr[i]; i += 2) {
+ if (strcmp("uri", attr[i]) == 0 && hasUri++ == 0) {
+ if (valid_uri(attr[i + 1], strlen(attr[i + 1]),
+ "https://")) {
+ nxml->snapshot_uri = xstrdup(attr[i + 1]);
+ continue;
+ }
+ }
+ if (strcmp("hash", attr[i]) == 0 && hasHash++ == 0) {
+ if (hex_decode(attr[i + 1], nxml->snapshot_hash,
+ sizeof(nxml->snapshot_hash)) == 0)
+ continue;
+ }
+ PARSE_FAIL(p, "parse failed - non conforming "
+ "attribute found in snapshot elem");
+ }
+ if (hasUri != 1 || hasHash != 1)
+ PARSE_FAIL(p, "parse failed - incomplete snapshot attributes");
+
+ nxml->scope = NOTIFICATION_SCOPE_SNAPSHOT;
+}
+
+static void
+end_snapshot_elem(struct notification_xml *nxml)
+{
+ XML_Parser p = nxml->parser;
+
+ if (nxml->scope != NOTIFICATION_SCOPE_SNAPSHOT)
+ PARSE_FAIL(p, "parse failed - exited snapshot "
+ "elem unexpectedely");
+ nxml->scope = NOTIFICATION_SCOPE_NOTIFICATION_POST_SNAPSHOT;
+}
+
+static void
+start_delta_elem(struct notification_xml *nxml, const char **attr)
+{
+ XML_Parser p = nxml->parser;
+ int i, hasUri = 0, hasHash = 0;
+ const char *delta_uri = NULL;
+ char delta_hash[SHA256_DIGEST_LENGTH];
+ long long delta_serial = 0;
+
+ if (nxml->scope != NOTIFICATION_SCOPE_NOTIFICATION_POST_SNAPSHOT)
+ PARSE_FAIL(p, "parse failed - entered delta "
+ "elem unexpectedely");
+ for (i = 0; attr[i]; i += 2) {
+ if (strcmp("uri", attr[i]) == 0 && hasUri++ == 0) {
+ if (valid_uri(attr[i + 1], strlen(attr[i + 1]),
+ "https://")) {
+ delta_uri = attr[i + 1];
+ continue;
+ }
+ }
+ if (strcmp("hash", attr[i]) == 0 && hasHash++ == 0) {
+ if (hex_decode(attr[i + 1], delta_hash,
+ sizeof(delta_hash)) == 0)
+ continue;
+ }
+ if (strcmp("serial", attr[i]) == 0 && delta_serial == 0) {
+ const char *errstr;
+
+ delta_serial = strtonum(attr[i + 1],
+ 1, LLONG_MAX, &errstr);
+ if (errstr == NULL)
+ continue;
+ }
+ PARSE_FAIL(p, "parse failed - non conforming "
+ "attribute found in snapshot elem");
+ }
+ /* Only add to the list if we are relevant */
+ if (hasUri != 1 || hasHash != 1 || delta_serial == 0)
+ PARSE_FAIL(p, "parse failed - incomplete delta attributes");
+
+ /* optimisation, add only deltas that could be interesting */
+ if (nxml->repository->serial != 0 &&
+ nxml->repository->serial < delta_serial) {
+ if (add_delta(nxml, delta_uri, delta_hash, delta_serial) == 0)
+ PARSE_FAIL(p, "parse failed - adding delta failed");
+ }
+
+ nxml->scope = NOTIFICATION_SCOPE_DELTA;
+}
+
+static void
+end_delta_elem(struct notification_xml *nxml)
+{
+ XML_Parser p = nxml->parser;
+
+ if (nxml->scope != NOTIFICATION_SCOPE_DELTA)
+ PARSE_FAIL(p, "parse failed - exited delta elem unexpectedely");
+ nxml->scope = NOTIFICATION_SCOPE_NOTIFICATION_POST_SNAPSHOT;
+}
+
+static void
+notification_xml_elem_start(void *data, const char *el, const char **attr)
+{
+ struct notification_xml *nxml = data;
+ XML_Parser p = nxml->parser;
+
+ /*
+ * Can only enter here once as we should have no ways to get back to
+ * START scope
+ */
+ if (strcmp("notification", el) == 0)
+ start_notification_elem(nxml, attr);
+ /*
+ * Will enter here multiple times, BUT never nested. will start
+ * collecting character data in that handler
+ * mem is cleared in end block, (TODO or on parse failure)
+ */
+ else if (strcmp("snapshot", el) == 0)
+ start_snapshot_elem(nxml, attr);
+ else if (strcmp("delta", el) == 0)
+ start_delta_elem(nxml, attr);
+ else
+ PARSE_FAIL(p, "parse failed - unexpected elem exit found");
+}
+
+static void
+notification_xml_elem_end(void *data, const char *el)
+{
+ struct notification_xml *nxml = data;
+ XML_Parser p = nxml->parser;
+
+ if (strcmp("notification", el) == 0)
+ end_notification_elem(nxml);
+ else if (strcmp("snapshot", el) == 0)
+ end_snapshot_elem(nxml);
+ else if (strcmp("delta", el) == 0)
+ end_delta_elem(nxml);
+ else
+ PARSE_FAIL(p, "parse failed - unexpected elem exit found");
+}
+
+struct notification_xml *
+new_notification_xml(XML_Parser p, struct rrdp_session *repository,
+ struct rrdp_session *current)
+{
+ struct notification_xml *nxml;
+
+ if ((nxml = calloc(1, sizeof(*nxml))) == NULL)
+ err(1, "%s", __func__);
+ TAILQ_INIT(&(nxml->delta_q));
+ nxml->parser = p;
+ nxml->repository = repository;
+ nxml->current = current;
+
+ XML_SetElementHandler(nxml->parser, notification_xml_elem_start,
+ notification_xml_elem_end);
+ XML_SetUserData(nxml->parser, nxml);
+
+ return nxml;
+}
+
+void
+free_notification_xml(struct notification_xml *nxml)
+{
+ if (nxml == NULL)
+ return;
+
+ free(nxml->session_id);
+ free(nxml->snapshot_uri);
+ while (!TAILQ_EMPTY(&nxml->delta_q)) {
+ struct delta_item *d = TAILQ_FIRST(&nxml->delta_q);
+ TAILQ_REMOVE(&nxml->delta_q, d, q);
+ free_delta(d);
+ }
+ free(nxml);
+}
+
+/*
+ * Finalize notification step, decide if a delta update is possible
+ * if either the session_id changed or the delta files fail to cover
+ * all the steps up to the new serial fall back to a snapshot.
+ * Return SNAPSHOT or DELTA for snapshot or delta processing.
+ * Return NOTIFICATION if repository is up to date.
+ */
+enum rrdp_task
+notification_done(struct notification_xml *nxml, char *last_mod)
+{
+ struct delta_item *d;
+ long long s, last_s;
+
+ nxml->current->last_mod = last_mod;
+ nxml->current->session_id = xstrdup(nxml->session_id);
+
+ /* check the that the session_id was valid and still the same */
+ if (nxml->repository->session_id == NULL ||
+ strcmp(nxml->session_id, nxml->repository->session_id) != 0)
+ goto snapshot;
+
+ /* if repository serial is 0 fall back to snapshot */
+ if (nxml->repository->serial == 0)
+ goto snapshot;
+
+ if (nxml->repository->serial == nxml->serial) {
+ nxml->current->serial = nxml->serial;
+ return NOTIFICATION;
+ }
+
+ /* check that all needed deltas are available */
+ s = nxml->repository->serial + 1;
+ TAILQ_FOREACH(d, &nxml->delta_q, q) {
+ if (d->serial != s++)
+ goto snapshot;
+ last_s = d->serial;
+ }
+ if (last_s != nxml->serial)
+ goto snapshot;
+
+ /* update via delta possible */
+ nxml->current->serial = nxml->repository->serial;
+ nxml->repository->serial = nxml->serial;
+ return DELTA;
+
+snapshot:
+ /* update via snapshot download */
+ nxml->current->serial = nxml->serial;
+ return SNAPSHOT;
+}
+
+const char *
+notification_get_next(struct notification_xml *nxml, char *hash, size_t hlen,
+ enum rrdp_task task)
+{
+ struct delta_item *d;
+
+ switch (task) {
+ case SNAPSHOT:
+ assert(hlen == sizeof(nxml->snapshot_hash));
+ memcpy(hash, nxml->snapshot_hash, hlen);
+ /*
+ * Ensure that the serial is correct in case a previous
+ * delta request failed.
+ */
+ nxml->current->serial = nxml->serial;
+ return nxml->snapshot_uri;
+ case DELTA:
+ /* first bump serial, then use first delta */
+ nxml->current->serial += 1;
+ d = TAILQ_FIRST(&nxml->delta_q);
+ assert(d->serial == nxml->current->serial);
+ assert(hlen == sizeof(d->hash));
+ memcpy(hash, d->hash, hlen);
+ return d->uri;
+ default:
+ errx(1, "%s: bad task", __func__);
+ }
+}
+
+/*
+ * Pop first element from the delta queue. Return non-0 if this was the last
+ * delta to fetch.
+ */
+int
+notification_delta_done(struct notification_xml *nxml)
+{
+ struct delta_item *d;
+
+ d = TAILQ_FIRST(&nxml->delta_q);
+ assert(d->serial == nxml->current->serial);
+ TAILQ_REMOVE(&nxml->delta_q, d, q);
+ free_delta(d);
+
+ assert(!TAILQ_EMPTY(&nxml->delta_q) ||
+ nxml->serial == nxml->current->serial);
+ return TAILQ_EMPTY(&nxml->delta_q);
+}
+
+void
+log_notification_xml(struct notification_xml *nxml)
+{
+ logx("session_id: %s, serial: %lld", nxml->session_id, nxml->serial);
+ logx("snapshot_uri: %s", nxml->snapshot_uri);
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
+ * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <err.h>
+#include <limits.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <expat.h>
+
+#include "extern.h"
+#include "rrdp.h"
+
+enum snapshot_scope {
+ SNAPSHOT_SCOPE_NONE,
+ SNAPSHOT_SCOPE_SNAPSHOT,
+ SNAPSHOT_SCOPE_PUBLISH,
+ SNAPSHOT_SCOPE_END
+};
+
+struct snapshot_xml {
+ XML_Parser parser;
+ struct rrdp_session *current;
+ struct rrdp *rrdp;
+ struct publish_xml *pxml;
+ char *session_id;
+ long long serial;
+ int version;
+ enum snapshot_scope scope;
+};
+
+static void
+start_snapshot_elem(struct snapshot_xml *sxml, const char **attr)
+{
+ XML_Parser p = sxml->parser;
+ int has_xmlns = 0;
+ int i;
+
+ if (sxml->scope != SNAPSHOT_SCOPE_NONE)
+ PARSE_FAIL(p,
+ "parse failed - entered snapshot elem unexpectedely");
+ for (i = 0; attr[i]; i += 2) {
+ const char *errstr;
+ if (strcmp("xmlns", attr[i]) == 0) {
+ has_xmlns = 1;
+ continue;
+ }
+ if (strcmp("version", attr[i]) == 0) {
+ sxml->version = strtonum(attr[i + 1],
+ 1, MAX_VERSION, &errstr);
+ if (errstr == NULL)
+ continue;
+ }
+ if (strcmp("session_id", attr[i]) == 0) {
+ sxml->session_id = xstrdup(attr[i + 1]);
+ continue;
+ }
+ if (strcmp("serial", attr[i]) == 0) {
+ sxml->serial = strtonum(attr[i + 1],
+ 1, LLONG_MAX, &errstr);
+ if (errstr == NULL)
+ continue;
+ }
+ PARSE_FAIL(p,
+ "parse failed - non conforming "
+ "attribute found in snapshot elem");
+ }
+ if (!(has_xmlns && sxml->version && sxml->session_id && sxml->serial))
+ PARSE_FAIL(p,
+ "parse failed - incomplete snapshot attributes");
+ if (strcmp(sxml->current->session_id, sxml->session_id) != 0)
+ PARSE_FAIL(p, "parse failed - session_id mismatch");
+ if (sxml->current->serial != sxml->serial)
+ PARSE_FAIL(p, "parse failed - serial mismatch");
+
+ sxml->scope = SNAPSHOT_SCOPE_SNAPSHOT;
+}
+
+static void
+end_snapshot_elem(struct snapshot_xml *sxml)
+{
+ XML_Parser p = sxml->parser;
+
+ if (sxml->scope != SNAPSHOT_SCOPE_SNAPSHOT)
+ PARSE_FAIL(p, "parse failed - exited snapshot "
+ "elem unexpectedely");
+ sxml->scope = SNAPSHOT_SCOPE_END;
+}
+
+static void
+start_publish_elem(struct snapshot_xml *sxml, const char **attr)
+{
+ XML_Parser p = sxml->parser;
+ char *uri = NULL;
+ int i, hasUri = 0;
+
+ if (sxml->scope != SNAPSHOT_SCOPE_SNAPSHOT)
+ PARSE_FAIL(p,
+ "parse failed - entered publish elem unexpectedely");
+ for (i = 0; attr[i]; i += 2) {
+ if (strcmp("uri", attr[i]) == 0 && hasUri++ == 0) {
+ if (valid_uri(attr[i + 1], strlen(attr[i + 1]),
+ "rsync://")) {
+ uri = xstrdup(attr[i + 1]);
+ continue;
+ }
+ }
+ /*
+ * XXX it seems people can not write proper XML, ignore
+ * bogus xmlns attribute on publish elements.
+ */
+ if (strcmp("xmlns", attr[i]) == 0)
+ continue;
+ PARSE_FAIL(p, "parse failed - non conforming"
+ " attribute '%s' found in publish elem", attr[i]);
+ }
+ if (hasUri != 1)
+ PARSE_FAIL(p, "parse failed - incomplete publish attributes");
+ sxml->pxml = new_publish_xml(PUB_ADD, uri, NULL, 0);
+ sxml->scope = SNAPSHOT_SCOPE_PUBLISH;
+}
+
+static void
+end_publish_elem(struct snapshot_xml *sxml)
+{
+ XML_Parser p = sxml->parser;
+
+ if (sxml->scope != SNAPSHOT_SCOPE_PUBLISH)
+ PARSE_FAIL(p, "parse failed - exited publish "
+ "elem unexpectedely");
+
+ if (publish_done(sxml->rrdp, sxml->pxml) != 0)
+ PARSE_FAIL(p, "parse failed - bad publish elem");
+ sxml->pxml = NULL;
+
+ sxml->scope = SNAPSHOT_SCOPE_SNAPSHOT;
+}
+
+static void
+snapshot_xml_elem_start(void *data, const char *el, const char **attr)
+{
+ struct snapshot_xml *sxml = data;
+ XML_Parser p = sxml->parser;
+
+ /*
+ * Can only enter here once as we should have no ways to get back to
+ * NONE scope
+ */
+ if (strcmp("snapshot", el) == 0)
+ start_snapshot_elem(sxml, attr);
+ /*
+ * Will enter here multiple times, BUT never nested. will start
+ * collecting character data in that handler mem is cleared in end
+ * block, (TODO or on parse failure)
+ */
+ else if (strcmp("publish", el) == 0)
+ start_publish_elem(sxml, attr);
+ else
+ PARSE_FAIL(p, "parse failed - unexpected elem exit found");
+}
+
+static void
+snapshot_xml_elem_end(void *data, const char *el)
+{
+ struct snapshot_xml *sxml = data;
+ XML_Parser p = sxml->parser;
+
+ if (strcmp("snapshot", el) == 0)
+ end_snapshot_elem(sxml);
+ else if (strcmp("publish", el) == 0)
+ end_publish_elem(sxml);
+ else
+ PARSE_FAIL(p, "parse failed - unexpected elem exit found");
+}
+
+static void
+snapshot_content_handler(void *data, const char *content, int length)
+{
+ struct snapshot_xml *sxml = data;
+
+ if (sxml->scope == SNAPSHOT_SCOPE_PUBLISH)
+ publish_add_content(sxml->pxml, content, length);
+}
+
+struct snapshot_xml *
+new_snapshot_xml(XML_Parser p, struct rrdp_session *rs, struct rrdp *r)
+{
+ struct snapshot_xml *sxml;
+
+ if ((sxml = calloc(1, sizeof(*sxml))) == NULL)
+ err(1, "%s", __func__);
+ sxml->parser = p;
+ sxml->current = rs;
+ sxml->rrdp = r;
+
+ if (XML_ParserReset(sxml->parser, "US-ASCII") != XML_TRUE)
+ errx(1, "%s: XML_ParserReset failed", __func__);
+
+ XML_SetElementHandler(sxml->parser, snapshot_xml_elem_start,
+ snapshot_xml_elem_end);
+ XML_SetCharacterDataHandler(sxml->parser, snapshot_content_handler);
+ XML_SetUserData(sxml->parser, sxml);
+
+ return sxml;
+}
+
+void
+free_snapshot_xml(struct snapshot_xml *sxml)
+{
+ if (sxml == NULL)
+ return;
+
+ free(sxml->session_id);
+ free_publish_xml(sxml->pxml);
+ free(sxml);
+}
+
+void
+log_snapshot_xml(struct snapshot_xml *sxml)
+{
+ logx("scope: %d", sxml->scope);
+ logx("version: %d", sxml->version);
+ logx("session_id: %s serial: %lld", sxml->session_id, sxml->serial);
+}