Improve detection of RRDP session desynchronization
authorclaudio <claudio@openbsd.org>
Fri, 23 Jun 2023 11:36:24 +0000 (11:36 +0000)
committerclaudio <claudio@openbsd.org>
Fri, 23 Jun 2023 11:36:24 +0000 (11:36 +0000)
According to RFC 8182, a given session_id and serial number represent an
immutable record of the state of the Repository Server at a certain
point in time.

Add a check to the RRDP notification file processing to compare whether
the delta hashes associated to previously seen serials are different in
newly fetched notification files. Fall back to a snapshot if a difference
is detected, because such a mutation is a strong desynchronization
indicator.

Idea from Ties de Kock (RIPE NCC).
Based on a diff by job@
With and OK job@ tb@

usr.sbin/rpki-client/extern.h
usr.sbin/rpki-client/main.c
usr.sbin/rpki-client/repo.c
usr.sbin/rpki-client/rrdp.c
usr.sbin/rpki-client/rrdp_notification.c

index ec6c257..abc945c 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: extern.h,v 1.184 2023/06/07 10:46:34 job Exp $ */
+/*     $OpenBSD: extern.h,v 1.185 2023/06/23 11:36:24 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
  *
@@ -486,6 +486,9 @@ enum rrdp_msg {
        RRDP_ABORT,
 };
 
+/* Maximum number of delta files per RRDP notification file. */
+#define MAX_RRDP_DELTAS                300
+
 /*
  * RRDP session state, needed to pickup at the right spot on next run.
  */
@@ -493,6 +496,7 @@ struct rrdp_session {
        char                    *last_mod;
        char                    *session_id;
        long long                serial;
+       char                    *deltas[MAX_RRDP_DELTAS];
 };
 
 /*
@@ -759,7 +763,11 @@ void                proc_rrdp(int) __attribute__((noreturn));
 /* Repository handling */
 int             filepath_add(struct filepath_tree *, char *, time_t);
 void            rrdp_clear(unsigned int);
-void            rrdp_save_state(unsigned int, struct rrdp_session *);
+void            rrdp_session_save(unsigned int, struct rrdp_session *);
+void            rrdp_session_free(struct rrdp_session *);
+void            rrdp_session_buffer(struct ibuf *,
+                   const struct rrdp_session *);
+struct rrdp_session    *rrdp_session_read(struct ibuf *);
 int             rrdp_handle_file(unsigned int, enum publish_type, char *,
                    char *, size_t, char *, size_t);
 char           *repo_basedir(const struct repo *, int);
@@ -944,9 +952,6 @@ int mkpathat(int, const char *);
 /* Maximum number of delegated hosting locations (repositories) for each TAL. */
 #define MAX_REPO_PER_TAL       1000
 
-/* Maximum number of delta files per RRDP notification file. */
-#define MAX_RRDP_DELTAS                300
-
 /*
  * Time - Evaluation time is used as the current time if it is
  * larger than X509_TIME_MIN, otherwise the system time is used.
index 39fd293..771b8cf 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: main.c,v 1.242 2023/06/20 15:15:14 claudio Exp $ */
+/*     $OpenBSD: main.c,v 1.243 2023/06/23 11:36:24 claudio Exp $ */
 /*
  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
@@ -280,9 +280,8 @@ rrdp_fetch(unsigned int id, const char *uri, const char *local,
        io_simple_buffer(b, &id, sizeof(id));
        io_str_buffer(b, local);
        io_str_buffer(b, uri);
-       io_str_buffer(b, s->session_id);
-       io_simple_buffer(b, &s->serial, sizeof(s->serial));
-       io_str_buffer(b, s->last_mod);
+
+       rrdp_session_buffer(b, s);
        io_close_buffer(&rrdpq, b);
 }
 
@@ -679,7 +678,7 @@ rrdp_process(struct ibuf *b)
 {
        enum rrdp_msg type;
        enum publish_type pt;
-       struct rrdp_session s;
+       struct rrdp_session *s;
        char *uri, *last_mod, *data;
        char hash[SHA256_DIGEST_LENGTH];
        size_t dsz;
@@ -700,12 +699,9 @@ rrdp_process(struct ibuf *b)
                rrdp_http_fetch(id, uri, last_mod);
                break;
        case RRDP_SESSION:
-               io_read_str(b, &s.session_id);
-               io_read_buf(b, &s.serial, sizeof(s.serial));
-               io_read_str(b, &s.last_mod);
-               rrdp_save_state(id, &s);
-               free(s.session_id);
-               free(s.last_mod);
+               s = rrdp_session_read(b);
+               rrdp_session_save(id, s);
+               rrdp_session_free(s);
                break;
        case RRDP_FILE:
                io_read_buf(b, &pt, sizeof(pt));
index 009ce25..ecaefc4 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: repo.c,v 1.47 2023/05/30 16:02:28 job Exp $ */
+/*     $OpenBSD: repo.c,v 1.48 2023/06/23 11:36:24 claudio Exp $ */
 /*
  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
@@ -619,22 +619,26 @@ repo_alloc(int talid)
  * Parse the RRDP state file if it exists and set the session struct
  * based on that information.
  */
-static void
-rrdp_parse_state(const struct rrdprepo *rr, struct rrdp_session *state)
+static struct rrdp_session *
+rrdp_session_parse(const struct rrdprepo *rr)
 {
        FILE *f;
-       int fd, ln = 0;
+       struct rrdp_session *state;
+       int fd, ln = 0, deltacnt = 0;
        const char *errstr;
        char *line = NULL, *file;
        size_t len = 0;
        ssize_t n;
 
+       if ((state = calloc(1, sizeof(*state))) == NULL)
+               err(1, NULL);
+
        file = rrdp_state_filename(rr, 0);
        if ((fd = open(file, O_RDONLY)) == -1) {
                if (errno != ENOENT)
                        warn("%s: open state file", rr->basedir);
                free(file);
-               return;
+               return state;
        }
        free(file);
        f = fdopen(fd, "r");
@@ -655,39 +659,47 @@ rrdp_parse_state(const struct rrdprepo *rr, struct rrdp_session *state)
                                goto fail;
                        break;
                case 2:
+                       if (strcmp(line, "-") == 0)
+                               break;
                        if ((state->last_mod = strdup(line)) == NULL)
                                err(1, NULL);
                        break;
                default:
-                       goto fail;
+                       if (deltacnt >= MAX_RRDP_DELTAS)
+                               goto fail;
+                       if ((state->deltas[deltacnt++] = strdup(line)) == NULL)
+                               err(1, NULL);
+                       break;
                }
                ln++;
        }
 
-       free(line);
        if (ferror(f))
                goto fail;
        fclose(f);
-       return;
+       free(line);
+       return state;
 
-fail:
+ fail:
        warnx("%s: troubles reading state file", rr->basedir);
        fclose(f);
+       free(line);
        free(state->session_id);
        free(state->last_mod);
        memset(state, 0, sizeof(*state));
+       return state;
 }
 
 /*
  * Carefully write the RRDP session state file back.
  */
 void
-rrdp_save_state(unsigned int id, struct rrdp_session *state)
+rrdp_session_save(unsigned int id, struct rrdp_session *state)
 {
        struct rrdprepo *rr;
        char *temp, *file;
-       FILE *f;
-       int fd;
+       FILE *f = NULL;
+       int fd, i;
 
        rr = rrdp_find(id);
        if (rr == NULL)
@@ -696,10 +708,8 @@ rrdp_save_state(unsigned int id, struct rrdp_session *state)
        file = rrdp_state_filename(rr, 0);
        temp = rrdp_state_filename(rr, 1);
 
-       if ((fd = mkostemp(temp, O_CLOEXEC)) == -1) {
-               warn("mkostemp %s", temp);
+       if ((fd = mkostemp(temp, O_CLOEXEC)) == -1)
                goto fail;
-       }
        (void)fchmod(fd, 0644);
        f = fdopen(fd, "w");
        if (f == NULL)
@@ -707,37 +717,94 @@ rrdp_save_state(unsigned int id, struct rrdp_session *state)
 
        /* write session state file out */
        if (fprintf(f, "%s\n%lld\n", state->session_id,
-           state->serial) < 0) {
-               fclose(f);
+           state->serial) < 0)
                goto fail;
-       }
+
        if (state->last_mod != NULL) {
-               if (fprintf(f, "%s\n", state->last_mod) < 0) {
-                       fclose(f);
+               if (fprintf(f, "%s\n", state->last_mod) < 0)
+                       goto fail;
+       } else {
+               if (fprintf(f, "-\n") < 0)
                        goto fail;
-               }
        }
-       if (fclose(f) != 0)
+       for (i = 0; state->deltas[i] != NULL; i++) {
+               if (fprintf(f, "%s\n", state->deltas[i]) < 0)
+                       goto fail;
+       }
+       if (fclose(f) != 0) {
+               f = NULL;
                goto fail;
+       }
 
-       if (rename(temp, file) == -1)
-               warn("%s: rename state file", rr->basedir);
+       if (rename(temp, file) == -1) {
+               warn("%s: rename %s to %s", rr->basedir, temp, file);
+               unlink(temp);
+       }
 
        free(temp);
        free(file);
        return;
 
-fail:
-       warnx("%s: failed to save state", rr->basedir);
+ fail:
+       warn("%s: save state to %s", rr->basedir, temp);
+       if (f != NULL)
+               fclose(f);
        unlink(temp);
        free(temp);
        free(file);
 }
 
+/*
+ * Free an rrdp_session pointer. Safe to call with NULL.
+ */
+void
+rrdp_session_free(struct rrdp_session *s)
+{
+       size_t i;
+
+       if (s == NULL)
+               return;
+       free(s->session_id);
+       free(s->last_mod);
+       for (i = 0; i < sizeof(s->deltas) / sizeof(s->deltas[0]); i++)
+               free(s->deltas[i]);
+       free(s);
+}
+
+void
+rrdp_session_buffer(struct ibuf *b, const struct rrdp_session *s)
+{
+       size_t i;
+
+       io_str_buffer(b, s->session_id);
+       io_simple_buffer(b, &s->serial, sizeof(s->serial));
+       io_str_buffer(b, s->last_mod);
+       for (i = 0; i < sizeof(s->deltas) / sizeof(s->deltas[0]); i++)
+               io_str_buffer(b, s->deltas[i]);
+}
+
+struct rrdp_session *
+rrdp_session_read(struct ibuf *b)
+{
+       struct rrdp_session *s;
+       size_t i;
+
+       if ((s = calloc(1, sizeof(*s))) == NULL)
+               err(1, NULL);
+
+       io_read_str(b, &s->session_id);
+       io_read_buf(b, &s->serial, sizeof(s->serial));
+       io_read_str(b, &s->last_mod);
+       for (i = 0; i < sizeof(s->deltas) / sizeof(s->deltas[0]); i++)
+               io_read_str(b, &s->deltas[i]);
+
+       return s;
+}
+
 static struct rrdprepo *
 rrdp_get(const char *uri)
 {
-       struct rrdp_session state = { 0 };
+       struct rrdp_session *state;
        struct rrdprepo *rr;
 
        SLIST_FOREACH(rr, &rrdprepos, entry)
@@ -767,10 +834,9 @@ rrdp_get(const char *uri)
        }
 
        /* parse state and start the sync */
-       rrdp_parse_state(rr, &state);
-       rrdp_fetch(rr->id, rr->notifyuri, rr->notifyuri, &state);
-       free(state.session_id);
-       free(state.last_mod);
+       state = rrdp_session_parse(rr);
+       rrdp_fetch(rr->id, rr->notifyuri, rr->notifyuri, state);
+       rrdp_session_free(state);
 
        logx("%s: pulling from %s", rr->notifyuri, "network");
 
@@ -1232,7 +1298,7 @@ repo_proto(const struct repo *rp)
 
        if (rp->ta != NULL) {
                const struct tarepo *tr = rp->ta;
-               if (tr->uriidx < tr->urisz && 
+               if (tr->uriidx < tr->urisz &&
                    strncasecmp(tr->uri[tr->uriidx], "rsync://", 8) == 0)
                        return "rsync";
                else
@@ -1667,14 +1733,14 @@ repo_cleanup_entry(FTSENT *e, struct filepath_tree *tree, int cachefd)
                                        logx("deleted superfluous %s", path);
                                if (fts_state.rp != NULL)
                                        fts_state.rp->repostats.del_extra_files++;
-                               else 
+                               else
                                        stats.repo_stats.del_extra_files++;
                        } else {
                                if (verbose > 1)
                                        logx("deleted %s", path);
                                if (fts_state.rp != NULL)
                                        fts_state.rp->repostats.del_files++;
-                               else 
+                               else
                                        stats.repo_stats.del_files++;
                        }
                }
@@ -1728,7 +1794,7 @@ repo_cleanup_entry(FTSENT *e, struct filepath_tree *tree, int cachefd)
                                warn("rmdir %s", path);
                        if (fts_state.rp != NULL)
                                fts_state.rp->repostats.del_dirs++;
-                       else 
+                       else
                                stats.repo_stats.del_dirs++;
                }
                break;
@@ -1795,7 +1861,8 @@ repo_free(void)
 }
 
 /*
- * Remove all files and directories under base but do not remove base itself.
+ * Remove all files and directories under base.
+ * Do not remove base directory itself and the .state file.
  */
 static void
 remove_contents(char *base)
@@ -1812,6 +1879,9 @@ remove_contents(char *base)
                case FTS_NSOK:
                case FTS_SL:
                case FTS_SLNONE:
+                       if (e->fts_level == 1 &&
+                           strcmp(e->fts_name, ".state") == 0)
+                               break;
                        if (unlink(e->fts_accpath) == -1)
                                warn("unlink %s", e->fts_path);
                        break;
index 08a861a..d7814c0 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: rrdp.c,v 1.31 2023/06/20 15:15:14 claudio Exp $ */
+/*     $OpenBSD: rrdp.c,v 1.32 2023/06/23 11:36:24 claudio Exp $ */
 /*
  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
@@ -65,8 +65,8 @@ struct rrdp {
        char                     hash[SHA256_DIGEST_LENGTH];
        SHA256_CTX               ctx;
 
-       struct rrdp_session      repository;
-       struct rrdp_session      current;
+       struct rrdp_session     *repository;
+       struct rrdp_session     *current;
        XML_Parser               parser;
        struct notification_xml *nxml;
        struct snapshot_xml     *sxml;
@@ -135,9 +135,7 @@ rrdp_state_send(struct rrdp *s)
        b = io_new_buffer();
        io_simple_buffer(b, &type, sizeof(type));
        io_simple_buffer(b, &s->id, sizeof(s->id));
-       io_str_buffer(b, s->current.session_id);
-       io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial));
-       io_str_buffer(b, s->current.last_mod);
+       rrdp_session_buffer(b, s->current);
        io_close_buffer(&msgq, b);
 }
 
@@ -182,8 +180,7 @@ rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
 }
 
 static void
-rrdp_new(unsigned int id, char *local, char *notify, char *session_id,
-    long long serial, char *last_mod)
+rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
 {
        struct rrdp *s;
 
@@ -194,15 +191,15 @@ rrdp_new(unsigned int id, char *local, char *notify, char *session_id,
        s->id = id;
        s->local = local;
        s->notifyuri = notify;
-       s->repository.session_id = session_id;
-       s->repository.serial = serial;
-       s->repository.last_mod = last_mod;
+       s->repository = state;
+       if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
+               err(1, NULL);
 
        s->state = RRDP_STATE_REQ;
        if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
                err(1, "XML_ParserCreate");
 
-       s->nxml = new_notification_xml(s->parser, &s->repository, &s->current,
+       s->nxml = new_notification_xml(s->parser, s->repository, s->current,
            notify);
 
        TAILQ_INSERT_TAIL(&states, s, entry);
@@ -227,10 +224,8 @@ rrdp_free(struct rrdp *s)
        free(s->notifyuri);
        free(s->local);
        free(s->last_mod);
-       free(s->repository.last_mod);
-       free(s->repository.session_id);
-       free(s->current.last_mod);
-       free(s->current.session_id);
+       rrdp_session_free(s->repository);
+       rrdp_session_free(s->current);
 
        free(s);
 }
@@ -259,7 +254,7 @@ rrdp_failed(struct rrdp *s)
                free_delta_xml(s->dxml);
                s->dxml = NULL;
                rrdp_clear_repo(s);
-               s->sxml = new_snapshot_xml(s->parser, &s->current, s);
+               s->sxml = new_snapshot_xml(s->parser, s->current, s);
                s->task = SNAPSHOT;
                s->state = RRDP_STATE_REQ;
                logx("%s: delta sync failed, fallback to snapshot", s->local);
@@ -322,26 +317,26 @@ rrdp_finished(struct rrdp *s)
                        switch (s->task) {
                        case NOTIFICATION:
                                logx("%s: repository not modified (%s#%lld)",
-                                   s->local, s->repository.session_id,
-                                   s->repository.serial);
+                                   s->local, s->repository->session_id,
+                                   s->repository->serial);
                                rrdp_state_send(s);
                                rrdp_free(s);
                                rrdp_done(id, 1);
                                break;
                        case SNAPSHOT:
                                logx("%s: downloading snapshot (%s#%lld)",
-                                   s->local, s->current.session_id,
-                                   s->current.serial);
+                                   s->local, s->current->session_id,
+                                   s->current->serial);
                                rrdp_clear_repo(s);
-                               s->sxml = new_snapshot_xml(p, &s->current, s);
+                               s->sxml = new_snapshot_xml(p, s->current, s);
                                s->state = RRDP_STATE_REQ;
                                break;
                        case DELTA:
                                logx("%s: downloading %lld deltas (%s#%lld)",
                                    s->local,
-                                   s->repository.serial - s->current.serial,
-                                   s->current.session_id, s->current.serial);
-                               s->dxml = new_delta_xml(p, &s->current, s);
+                                   s->repository->serial - s->current->serial,
+                                   s->current->session_id, s->current->serial);
+                               s->dxml = new_delta_xml(p, s->current, s);
                                s->state = RRDP_STATE_REQ;
                                break;
                        }
@@ -360,14 +355,14 @@ rrdp_finished(struct rrdp *s)
                        } else {
                                /* reset delta parser for next delta */
                                free_delta_xml(s->dxml);
-                               s->dxml = new_delta_xml(p, &s->current, s);
+                               s->dxml = new_delta_xml(p, s->current, s);
                                s->state = RRDP_STATE_REQ;
                        }
                        break;
                }
        } else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
                logx("%s: notification file not modified (%s#%lld)", s->local,
-                   s->repository.session_id, s->repository.serial);
+                   s->repository->session_id, s->repository->serial);
                /* no need to update state file */
                rrdp_free(s);
                rrdp_done(id, 1);
@@ -408,12 +403,12 @@ static void
 rrdp_input_handler(int fd)
 {
        static struct ibuf *inbuf;
-       char *local, *notify, *session_id, *last_mod;
+       struct rrdp_session *state;
+       char *local, *notify, *last_mod;
        struct ibuf *b;
        struct rrdp *s;
        enum rrdp_msg type;
        enum http_result res;
-       long long serial;
        unsigned int id;
        int ok;
 
@@ -426,15 +421,12 @@ rrdp_input_handler(int fd)
 
        switch (type) {
        case RRDP_START:
-               io_read_str(b, &local);
-               io_read_str(b, &notify);
-               io_read_str(b, &session_id);
-               io_read_buf(b, &serial, sizeof(serial));
-               io_read_str(b, &last_mod);
                if (ibuf_fd_avail(b))
                        errx(1, "received unexpected fd");
-
-               rrdp_new(id, local, notify, session_id, serial, last_mod);
+               io_read_str(b, &local);
+               io_read_str(b, &notify);
+               state = rrdp_session_read(b);
+               rrdp_new(id, local, notify, state);
                break;
        case RRDP_HTTP_INI:
                s = rrdp_get(id);
@@ -569,7 +561,7 @@ proc_rrdp(int fd)
                                switch (s->task) {
                                case NOTIFICATION:
                                        rrdp_http_req(s->id, s->notifyuri,
-                                           s->repository.last_mod);
+                                           s->repository->last_mod);
                                        break;
                                case SNAPSHOT:
                                case DELTA:
index b135eca..dec5f21 100644 (file)
@@ -1,4 +1,4 @@
-/*     $OpenBSD: rrdp_notification.c,v 1.17 2023/01/04 14:22:43 claudio Exp $ */
+/*     $OpenBSD: rrdp_notification.c,v 1.18 2023/06/23 11:36:24 claudio Exp $ */
 /*
  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
@@ -60,6 +60,7 @@ struct notification_xml {
        char                     snapshot_hash[SHA256_DIGEST_LENGTH];
        struct delta_q           delta_q;
        long long                serial;
+       long long                min_serial;
        int                      version;
        enum notification_scope  scope;
 };
@@ -101,6 +102,21 @@ add_delta(struct notification_xml *nxml, const char *uri,
        return 1;
 }
 
+/* check that there are no holes in the list */
+static int
+check_delta(struct notification_xml *nxml)
+{
+       struct delta_item *d;
+       long long serial = 0;
+
+       TAILQ_FOREACH(d, &nxml->delta_q, q) {
+               if (serial != 0 && serial + 1 != d->serial)
+                       return 0;
+               serial = d->serial;
+       }
+       return 1;
+}
+
 static void
 free_delta(struct delta_item *d)
 {
@@ -108,6 +124,36 @@ free_delta(struct delta_item *d)
        free(d);
 }
 
+/*
+ * Parse a delta serial and hash line at idx from the rrdp session state.
+ * Return the serial or 0 on error. If hash is non-NULL, it is set to the
+ * start of the hash string on success.
+ */
+static long long
+delta_parse(struct rrdp_session *s, size_t idx, char **hash)
+{
+       long long serial;
+       char *line, *ep;
+
+       if (hash != NULL)
+               *hash = NULL;
+       if (idx < 0 || idx >= sizeof(s->deltas) / sizeof(s->deltas[0]))
+               return 0;
+       if ((line = s->deltas[idx]) == NULL)
+               return 0;
+
+       errno = 0;
+       serial = strtoll(line, &ep, 10);
+       if (line[0] == '\0' || *ep != ' ')
+               return 0;
+       if (serial <= 0 || (errno == ERANGE && serial == LLONG_MAX))
+               return 0;
+
+       if (hash != NULL)
+               *hash = ep + 1;
+       return serial;
+}
+
 static void
 start_notification_elem(struct notification_xml *nxml, const char **attr)
 {
@@ -149,6 +195,10 @@ start_notification_elem(struct notification_xml *nxml, const char **attr)
                PARSE_FAIL(p, "parse failed - incomplete "
                    "notification attributes");
 
+       /* Limit deltas to the ones which matter for us. */
+       if (nxml->min_serial == 0 && nxml->serial > MAX_RRDP_DELTAS)
+               nxml->min_serial = nxml->serial - MAX_RRDP_DELTAS;
+
        nxml->scope = NOTIFICATION_SCOPE_NOTIFICATION;
 }
 
@@ -161,6 +211,9 @@ end_notification_elem(struct notification_xml *nxml)
                PARSE_FAIL(p, "parse failed - exited notification "
                    "elem unexpectedely");
        nxml->scope = NOTIFICATION_SCOPE_END;
+
+       if (!check_delta(nxml))
+               PARSE_FAIL(p, "parse failed - delta list has holes");
 }
 
 static void
@@ -247,11 +300,12 @@ start_delta_elem(struct notification_xml *nxml, const char **attr)
        if (hasUri != 1 || hasHash != 1 || delta_serial == 0)
                PARSE_FAIL(p, "parse failed - incomplete delta attributes");
 
+       /* Delta serial must be smaller or equal to the notification serial */
+       if (nxml->serial < delta_serial)
+               PARSE_FAIL(p, "parse failed - bad delta serial");
+
        /* optimisation, add only deltas that could be interesting */
-       if (nxml->repository->serial != 0 &&
-           nxml->repository->serial < delta_serial &&
-           nxml->repository->session_id != NULL &&
-           strcmp(nxml->session_id, nxml->repository->session_id) == 0) {
+       if (nxml->min_serial < delta_serial) {
                if (add_delta(nxml, delta_uri, delta_hash, delta_serial) == 0)
                        PARSE_FAIL(p, "parse failed - adding delta failed");
        }
@@ -333,6 +387,7 @@ new_notification_xml(XML_Parser p, struct rrdp_session *repository,
        nxml->repository = repository;
        nxml->current = current;
        nxml->notifyuri = notifyuri;
+       nxml->min_serial = delta_parse(repository, 0, NULL);
 
        XML_SetElementHandler(nxml->parser, notification_xml_elem_start,
            notification_xml_elem_end);
@@ -343,6 +398,16 @@ new_notification_xml(XML_Parser p, struct rrdp_session *repository,
        return nxml;
 }
 
+static void
+free_delta_queue(struct notification_xml *nxml)
+{
+       while (!TAILQ_EMPTY(&nxml->delta_q)) {
+               struct delta_item *d = TAILQ_FIRST(&nxml->delta_q);
+               TAILQ_REMOVE(&nxml->delta_q, d, q);
+               free_delta(d);
+       }
+}
+
 void
 free_notification_xml(struct notification_xml *nxml)
 {
@@ -351,14 +416,88 @@ free_notification_xml(struct notification_xml *nxml)
 
        free(nxml->session_id);
        free(nxml->snapshot_uri);
-       while (!TAILQ_EMPTY(&nxml->delta_q)) {
-               struct delta_item *d = TAILQ_FIRST(&nxml->delta_q);
-               TAILQ_REMOVE(&nxml->delta_q, d, q);
-               free_delta(d);
-       }
+       free_delta_queue(nxml);
        free(nxml);
 }
 
+/*
+ * Collect a list of deltas to store in the repository state.
+ */
+static void
+notification_collect_deltas(struct notification_xml *nxml)
+{
+       struct delta_item *d;
+       long long keep_serial = 0;
+       size_t cur_idx = 0, max_deltas;
+       char *hash;
+
+       max_deltas =
+           sizeof(nxml->current->deltas) / sizeof(nxml->current->deltas[0]);
+
+       if (nxml->serial > (long long)max_deltas)
+               keep_serial = nxml->serial - max_deltas + 1;
+
+       TAILQ_FOREACH(d, &nxml->delta_q, q) {
+               if (d->serial >= keep_serial) {
+                       assert(cur_idx < max_deltas);
+                       hash = hex_encode(d->hash, sizeof(d->hash));
+                       if (asprintf(&nxml->current->deltas[cur_idx++],
+                           "%lld %s", d->serial, hash) == -1)
+                               err(1, NULL);
+                       free(hash);
+               }
+       }
+}
+
+/*
+ * Validate the delta list with the information from the repository state.
+ * Remove all obsolete deltas so that the list starts with the delta with
+ * serial nxml->repository->serial + 1.
+ * Returns 1 if all deltas were valid and 0 on failure.
+ */
+static int
+notification_check_deltas(struct notification_xml *nxml)
+{
+       struct delta_item *d, *nextd;
+       char *hash, *exp_hash;
+       long long exp_serial, new_serial;
+       size_t exp_idx = 0;
+
+       exp_serial = delta_parse(nxml->repository, exp_idx++, &exp_hash);
+       new_serial = nxml->repository->serial + 1;
+
+       /* compare hash of delta against repository state info */
+       TAILQ_FOREACH_SAFE(d, &nxml->delta_q, q, nextd) {
+               while (exp_serial != 0  && exp_serial < d->serial) {
+                       exp_serial = delta_parse(nxml->repository,
+                           exp_idx++, &exp_hash);
+               }
+
+               if (d->serial == exp_serial) {
+                       hash = hex_encode(d->hash, sizeof(d->hash));
+                       if (strcmp(hash, exp_hash) != 0) {
+                               warnx("%s: %s#%lld unexpected delta "
+                                   "mutation (expected %s, got %s)",
+                                   nxml->notifyuri, nxml->session_id,
+                                   exp_serial, hash, exp_hash);
+                               free(hash);
+                               return 0;
+                       }
+                       free(hash);
+                       exp_serial = delta_parse(nxml->repository,
+                           exp_idx++, &exp_hash);
+               }
+
+               /* is this delta needed? */
+               if (d->serial < new_serial) {
+                       TAILQ_REMOVE(&nxml->delta_q, d, q);
+                       free_delta(d);
+               }
+       }
+
+       return 1;
+}
+
 /*
  * Finalize notification step, decide if a delta update is possible
  * if either the session_id changed or the delta files fail to cover
@@ -369,11 +508,9 @@ free_notification_xml(struct notification_xml *nxml)
 enum rrdp_task
 notification_done(struct notification_xml *nxml, char *last_mod)
 {
-       struct delta_item *d;
-       long long s, last_s = 0;
-
        nxml->current->last_mod = last_mod;
        nxml->current->session_id = xstrdup(nxml->session_id);
+       notification_collect_deltas(nxml);
 
        /* check the that the session_id was valid and still the same */
        if (nxml->repository->session_id == NULL ||
@@ -384,6 +521,10 @@ notification_done(struct notification_xml *nxml, char *last_mod)
        if (nxml->repository->serial == 0)
                goto snapshot;
 
+       /* check that all needed deltas are available and valid */
+       if (!notification_check_deltas(nxml))
+               goto snapshot;
+
        if (nxml->repository->serial > nxml->serial)
                warnx("%s: serial number decreased from %lld to %lld",
                    nxml->notifyuri, nxml->repository->serial, nxml->serial);
@@ -399,14 +540,12 @@ notification_done(struct notification_xml *nxml, char *last_mod)
        if (nxml->serial - nxml->repository->serial > MAX_RRDP_DELTAS)
                goto snapshot;
 
-       /* check that all needed deltas are available */
-       s = nxml->repository->serial + 1;
-       TAILQ_FOREACH(d, &nxml->delta_q, q) {
-               if (d->serial != s++)
-                       goto snapshot;
-               last_s = d->serial;
-       }
-       if (last_s != nxml->serial)
+       /* no deltas queued */
+       if (TAILQ_EMPTY(&nxml->delta_q))
+               goto snapshot;
+
+       /* first possible delta is no match */
+       if (nxml->repository->serial + 1 != TAILQ_FIRST(&nxml->delta_q)->serial)
                goto snapshot;
 
        /* update via delta possible */
@@ -416,6 +555,7 @@ notification_done(struct notification_xml *nxml, char *last_mod)
 
 snapshot:
        /* update via snapshot download */
+       free_delta_queue(nxml);
        nxml->current->serial = nxml->serial;
        return SNAPSHOT;
 }