tipc: add name distributor resiliency queue

TIPC name table updates are distributed asynchronously in a cluster,
entailing a risk of certain race conditions. E.g., if two nodes
simultaneously issue conflicting (overlapping) publications, this may
not be detected until both publications have reached a third node, in
which case one of the publications will be silently dropped on that
node. Hence, we end up with an inconsistent name table.

In most cases this conflict is just a temporary race, e.g., one
node is issuing a publication under the assumption that a previous,
conflicting, publication has already been withdrawn by the other node.
However, because of the (rtt related) distributed update delay, this
may not yet hold true on all nodes. The symptom of this failure is a
syslog message: "tipc: Cannot publish {%u,%u,%u}, overlap error".

In this commit we add a resiliency queue at the receiving end of
the name table distributor. When insertion of an arriving publication
fails, we retain it in this queue for a short amount of time, assuming
that another update will arrive very soon and clear the conflict. If so
happens, we insert the publication, otherwise we drop it.

The (configurable) retention value defaults to 2000 ms. Knowing from
experience that the situation described above is extremely rare, there
is no risk that the queue will accumulate any large number of items.

Signed-off-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 0591f33..780ef71 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/name_distr.c: TIPC name distribution code
  *
- * Copyright (c) 2000-2006, Ericsson AB
+ * Copyright (c) 2000-2006, 2014, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -71,6 +71,21 @@
 };
 
 
+int sysctl_tipc_named_timeout __read_mostly = 2000;
+
+/**
+ * struct tipc_dist_queue - queue holding deferred name table updates
+ */
+static struct list_head tipc_dist_queue = LIST_HEAD_INIT(tipc_dist_queue);
+
+struct distr_queue_item {
+	struct distr_item i;
+	u32 dtype;
+	u32 node;
+	unsigned long expires;
+	struct list_head next;
+};
+
 /**
  * publ_to_item - add publication info to a publication message
  */
@@ -299,6 +314,52 @@
 }
 
 /**
+ * tipc_named_add_backlog - add a failed name table update to the backlog
+ *
+ */
+static void tipc_named_add_backlog(struct distr_item *i, u32 type, u32 node)
+{
+	struct distr_queue_item *e;
+	unsigned long now = get_jiffies_64();
+
+	e = kzalloc(sizeof(*e), GFP_ATOMIC);
+	if (!e)
+		return;
+	e->dtype = type;
+	e->node = node;
+	e->expires = now + msecs_to_jiffies(sysctl_tipc_named_timeout);
+	memcpy(e, i, sizeof(*i));
+	list_add_tail(&e->next, &tipc_dist_queue);
+}
+
+/**
+ * tipc_named_process_backlog - try to process any pending name table updates
+ * from the network.
+ */
+void tipc_named_process_backlog(void)
+{
+	struct distr_queue_item *e, *tmp;
+	char addr[16];
+	unsigned long now = get_jiffies_64();
+
+	list_for_each_entry_safe(e, tmp, &tipc_dist_queue, next) {
+		if (time_after(e->expires, now)) {
+			if (!tipc_update_nametbl(&e->i, e->node, e->dtype))
+				continue;
+		} else {
+			tipc_addr_string_fill(addr, e->node);
+			pr_warn_ratelimited("Dropping name table update (%d) of {%u, %u, %u} from %s key=%u\n",
+					    e->dtype, ntohl(e->i.type),
+					    ntohl(e->i.lower),
+					    ntohl(e->i.upper),
+					    addr, ntohl(e->i.key));
+		}
+		list_del(&e->next);
+		kfree(e);
+	}
+}
+
+/**
  * tipc_named_rcv - process name table update message sent by another node
  */
 void tipc_named_rcv(struct sk_buff *buf)
@@ -306,13 +367,15 @@
 	struct tipc_msg *msg = buf_msg(buf);
 	struct distr_item *item = (struct distr_item *)msg_data(msg);
 	u32 count = msg_data_sz(msg) / ITEM_SIZE;
+	u32 node = msg_orignode(msg);
 
 	write_lock_bh(&tipc_nametbl_lock);
 	while (count--) {
-		tipc_update_nametbl(item, msg_orignode(msg),
-				    msg_type(msg));
+		if (!tipc_update_nametbl(item, node, msg_type(msg)))
+			tipc_named_add_backlog(item, msg_type(msg), node);
 		item++;
 	}
+	tipc_named_process_backlog();
 	write_unlock_bh(&tipc_nametbl_lock);
 	kfree_skb(buf);
 }