afs: Overhaul cell database management

Overhaul the way that the in-kernel AFS client keeps track of cells in the
following manner:

 (1) Cells are now held in an rbtree to make walking them quicker and RCU
     managed (though this is probably overkill).

 (2) Cells now have a manager work item that:

     (A) Looks after fetching and refreshing the VL server list.

     (B) Manages cell record lifetime, including initialising and
     	 destruction.

     (B) Manages cell record caching whereby threads are kept around for a
     	 certain time after last use and then destroyed.

     (C) Manages the FS-Cache index cookie for a cell.  It is not permitted
     	 for a cookie to be in use twice, so we have to be careful to not
     	 allow a new cell record to exist at the same time as an old record
     	 of the same name.

 (3) Each AFS network namespace is given a manager work item that manages
     the cells within it, maintaining a single timer to prod cells into
     updating their DNS records.

     This uses the reduce_timer() facility to make the timer expire at the
     soonest timed event that needs happening.

 (4) When a module is being unloaded, cells and cell managers are now
     counted out using dec_after_work() to make sure the module text is
     pinned until after the data structures have been cleaned up.

 (5) Each cell's VL server list is now protected by a seqlock rather than a
     semaphore.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/fs/afs/cell.c b/fs/afs/cell.c
index 216821f..e83103e 100644
--- a/fs/afs/cell.c
+++ b/fs/afs/cell.c
@@ -1,6 +1,6 @@
 /* AFS cell and server record management
  *
- * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2002, 2017 Red Hat, Inc. All Rights Reserved.
  * Written by David Howells (dhowells@redhat.com)
  *
  * This program is free software; you can redistribute it and/or
@@ -19,221 +19,324 @@
 #include <keys/rxrpc-type.h>
 #include "internal.h"
 
+unsigned __read_mostly afs_cell_gc_delay = 10;
+
+static void afs_manage_cell(struct work_struct *);
+
+static void afs_dec_cells_outstanding(struct afs_net *net)
+{
+	if (atomic_dec_and_test(&net->cells_outstanding))
+		wake_up_atomic_t(&net->cells_outstanding);
+}
+
 /*
- * allocate a cell record and fill in its name, VL server address list and
+ * Set the cell timer to fire after a given delay, assuming it's not already
+ * set for an earlier time.
+ */
+static void afs_set_cell_timer(struct afs_net *net, time64_t delay)
+{
+	if (net->live) {
+		atomic_inc(&net->cells_outstanding);
+		if (timer_reduce(&net->cells_timer, jiffies + delay * HZ))
+			afs_dec_cells_outstanding(net);
+	}
+}
+
+/*
+ * Look up and get an activation reference on a cell record under RCU
+ * conditions.  The caller must hold the RCU read lock.
+ */
+struct afs_cell *afs_lookup_cell_rcu(struct afs_net *net,
+				     const char *name, unsigned int namesz)
+{
+	struct afs_cell *cell = NULL;
+	struct rb_node *p;
+	int n, seq = 0, ret = 0;
+
+	_enter("%*.*s", namesz, namesz, name);
+
+	if (name && namesz == 0)
+		return ERR_PTR(-EINVAL);
+	if (namesz > AFS_MAXCELLNAME)
+		return ERR_PTR(-ENAMETOOLONG);
+
+	do {
+		/* Unfortunately, rbtree walking doesn't give reliable results
+		 * under just the RCU read lock, so we have to check for
+		 * changes.
+		 */
+		if (cell)
+			afs_put_cell(net, cell);
+		cell = NULL;
+		ret = -ENOENT;
+
+		read_seqbegin_or_lock(&net->cells_lock, &seq);
+
+		if (!name) {
+			cell = rcu_dereference_raw(net->ws_cell);
+			if (cell) {
+				afs_get_cell(cell);
+				continue;
+			}
+			ret = -EDESTADDRREQ;
+			continue;
+		}
+
+		p = rcu_dereference_raw(net->cells.rb_node);
+		while (p) {
+			cell = rb_entry(p, struct afs_cell, net_node);
+
+			n = strncasecmp(cell->name, name,
+					min_t(size_t, cell->name_len, namesz));
+			if (n == 0)
+				n = cell->name_len - namesz;
+			if (n < 0) {
+				p = rcu_dereference_raw(p->rb_left);
+			} else if (n > 0) {
+				p = rcu_dereference_raw(p->rb_right);
+			} else {
+				if (atomic_inc_not_zero(&cell->usage)) {
+					ret = 0;
+					break;
+				}
+				/* We want to repeat the search, this time with
+				 * the lock properly locked.
+				 */
+			}
+			cell = NULL;
+		}
+
+	} while (need_seqretry(&net->cells_lock, seq));
+
+	done_seqretry(&net->cells_lock, seq);
+
+	return ret == 0 ? cell : ERR_PTR(ret);
+}
+
+/*
+ * Set up a cell record and fill in its name, VL server address list and
  * allocate an anonymous key
  */
-static struct afs_cell *afs_cell_alloc(struct afs_net *net,
-				       const char *name, unsigned namelen,
-				       char *vllist)
+static struct afs_cell *afs_alloc_cell(struct afs_net *net,
+				       const char *name, unsigned int namelen,
+				       const char *vllist)
 {
 	struct afs_cell *cell;
-	struct key *key;
-	char keyname[4 + AFS_MAXCELLNAME + 1], *cp, *dp, *next;
-	char  *dvllist = NULL, *_vllist = NULL;
-	char  delimiter = ':';
-	int ret, i;
+	int i, ret;
 
-	_enter("%*.*s,%s", namelen, namelen, name ?: "", vllist);
-
-	BUG_ON(!name); /* TODO: want to look up "this cell" in the cache */
-
+	ASSERT(name);
+	if (namelen == 0)
+		return ERR_PTR(-EINVAL);
 	if (namelen > AFS_MAXCELLNAME) {
 		_leave(" = -ENAMETOOLONG");
 		return ERR_PTR(-ENAMETOOLONG);
 	}
 
-	/* allocate and initialise a cell record */
-	cell = kzalloc(sizeof(struct afs_cell) + namelen + 1, GFP_KERNEL);
+	_enter("%*.*s,%s", namelen, namelen, name, vllist);
+
+	cell = kzalloc(sizeof(struct afs_cell), GFP_KERNEL);
 	if (!cell) {
 		_leave(" = -ENOMEM");
 		return ERR_PTR(-ENOMEM);
 	}
 
-	memcpy(cell->name, name, namelen);
-	cell->name[namelen] = 0;
-
-	atomic_set(&cell->usage, 1);
-	INIT_LIST_HEAD(&cell->link);
 	cell->net = net;
+	cell->name_len = namelen;
+	for (i = 0; i < namelen; i++)
+		cell->name[i] = tolower(name[i]);
+
+	atomic_set(&cell->usage, 2);
+	INIT_WORK(&cell->manager, afs_manage_cell);
 	rwlock_init(&cell->servers_lock);
 	INIT_LIST_HEAD(&cell->servers);
 	init_rwsem(&cell->vl_sem);
 	INIT_LIST_HEAD(&cell->vl_list);
 	spin_lock_init(&cell->vl_lock);
+	seqlock_init(&cell->vl_addrs_lock);
+	cell->flags = (1 << AFS_CELL_FL_NOT_READY);
 
 	for (i = 0; i < AFS_CELL_MAX_ADDRS; i++) {
 		struct sockaddr_rxrpc *srx = &cell->vl_addrs[i];
 		srx->srx_family			= AF_RXRPC;
 		srx->srx_service		= VL_SERVICE;
 		srx->transport_type		= SOCK_DGRAM;
-		srx->transport.sin.sin_port	= htons(AFS_VL_PORT);
+		srx->transport.sin6.sin6_family	= AF_INET6;
+		srx->transport.sin6.sin6_port	= htons(AFS_VL_PORT);
 	}
 
-	/* if the ip address is invalid, try dns query */
-	if (!vllist || strlen(vllist) < 7) {
-		ret = dns_query("afsdb", name, namelen, "ipv4", &dvllist, NULL);
-		if (ret < 0) {
-			if (ret == -ENODATA || ret == -EAGAIN || ret == -ENOKEY)
-				/* translate these errors into something
-				 * userspace might understand */
-				ret = -EDESTADDRREQ;
-			_leave(" = %d", ret);
-			return ERR_PTR(ret);
-		}
-		_vllist = dvllist;
+	/* Fill in the VL server list if we were given a list of addresses to
+	 * use.
+	 */
+	if (vllist) {
+		char delim = ':';
 
-		/* change the delimiter for user-space reply */
-		delimiter = ',';
-
-	} else {
 		if (strchr(vllist, ',') || !strchr(vllist, '.'))
-			delimiter = ',';
-		_vllist = vllist;
+			delim = ',';
+
+		do {
+			struct sockaddr_rxrpc *srx = &cell->vl_addrs[cell->vl_naddrs];
+
+			if (in4_pton(vllist, -1,
+				     (u8 *)&srx->transport.sin6.sin6_addr.s6_addr32[3],
+				     delim, &vllist)) {
+				srx->transport_len = sizeof(struct sockaddr_in6);
+				srx->transport.sin6.sin6_addr.s6_addr32[0] = 0;
+				srx->transport.sin6.sin6_addr.s6_addr32[1] = 0;
+				srx->transport.sin6.sin6_addr.s6_addr32[2] = htonl(0xffff);
+			} else if (in6_pton(vllist, -1,
+					    srx->transport.sin6.sin6_addr.s6_addr,
+					    delim, &vllist)) {
+				srx->transport_len = sizeof(struct sockaddr_in6);
+				srx->transport.sin6.sin6_family	= AF_INET6;
+			} else {
+				goto bad_address;
+			}
+
+			cell->vl_naddrs++;
+			if (!*vllist)
+				break;
+			vllist++;
+
+		} while (cell->vl_naddrs < AFS_CELL_MAX_ADDRS && vllist);
+
+		/* Disable DNS refresh for manually-specified cells */
+		cell->dns_expiry = TIME64_MAX;
+	} else {
+		/* We're going to need to 'refresh' this cell's VL server list
+		 * from the DNS before we can use it.
+		 */
+		cell->dns_expiry = S64_MIN;
 	}
 
-	/* fill in the VL server list from the rest of the string */
-	do {
-		struct sockaddr_rxrpc *srx = &cell->vl_addrs[cell->vl_naddrs];
-		const char *end;
-
-		next = strchr(_vllist, delimiter);
-		if (next)
-			*next++ = 0;
-
-		if (in4_pton(_vllist, -1, (u8 *)&srx->transport.sin6.sin6_addr.s6_addr32[3],
-			     -1, &end)) {
-			srx->transport_len		= sizeof(struct sockaddr_in6);
-			srx->transport.sin6.sin6_family	= AF_INET6;
-			srx->transport.sin6.sin6_flowinfo = 0;
-			srx->transport.sin6.sin6_scope_id = 0;
-			srx->transport.sin6.sin6_addr.s6_addr32[0] = 0;
-			srx->transport.sin6.sin6_addr.s6_addr32[1] = 0;
-			srx->transport.sin6.sin6_addr.s6_addr32[2] = htonl(0xffff);
-		} else if (in6_pton(_vllist, -1, srx->transport.sin6.sin6_addr.s6_addr,
-				    -1, &end)) {
-			srx->transport_len		= sizeof(struct sockaddr_in6);
-			srx->transport.sin6.sin6_family	= AF_INET6;
-			srx->transport.sin6.sin6_flowinfo = 0;
-			srx->transport.sin6.sin6_scope_id = 0;
-		} else {
-			goto bad_address;
-		}
-
-	} while (cell->vl_naddrs++,
-		 cell->vl_naddrs < AFS_CELL_MAX_ADDRS && (_vllist = next));
-
-	/* create a key to represent an anonymous user */
-	memcpy(keyname, "afs@", 4);
-	dp = keyname + 4;
-	cp = cell->name;
-	do {
-		*dp++ = toupper(*cp);
-	} while (*cp++);
-
-	key = rxrpc_get_null_key(keyname);
-	if (IS_ERR(key)) {
-		_debug("no key");
-		ret = PTR_ERR(key);
-		goto error;
-	}
-	cell->anonymous_key = key;
-
-	_debug("anon key %p{%x}",
-	       cell->anonymous_key, key_serial(cell->anonymous_key));
-
 	_leave(" = %p", cell);
 	return cell;
 
 bad_address:
 	printk(KERN_ERR "kAFS: bad VL server IP address\n");
 	ret = -EINVAL;
-error:
-	key_put(cell->anonymous_key);
-	kfree(dvllist);
 	kfree(cell);
 	_leave(" = %d", ret);
 	return ERR_PTR(ret);
 }
 
 /*
- * afs_cell_crate() - create a cell record
+ * afs_lookup_cell - Look up or create a cell record.
  * @net:	The network namespace
- * @name:	is the name of the cell.
- * @namsesz:	is the strlen of the cell name.
- * @vllist:	is a colon separated list of IP addresses in "a.b.c.d" format.
- * @retref:	is T to return the cell reference when the cell exists.
+ * @name:	The name of the cell.
+ * @namesz:	The strlen of the cell name.
+ * @vllist:	A colon/comma separated list of numeric IP addresses or NULL.
+ * @excl:	T if an error should be given if the cell name already exists.
+ *
+ * Look up a cell record by name and query the DNS for VL server addresses if
+ * needed.  Note that that actual DNS query is punted off to the manager thread
+ * so that this function can return immediately if interrupted whilst allowing
+ * cell records to be shared even if not yet fully constructed.
  */
-struct afs_cell *afs_cell_create(struct afs_net *net,
-				 const char *name, unsigned namesz,
-				 char *vllist, bool retref)
+struct afs_cell *afs_lookup_cell(struct afs_net *net,
+				 const char *name, unsigned int namesz,
+				 const char *vllist, bool excl)
 {
-	struct afs_cell *cell;
-	int ret;
+	struct afs_cell *cell, *candidate, *cursor;
+	struct rb_node *parent, **pp;
+	int ret, n;
 
-	_enter("%*.*s,%s", namesz, namesz, name ?: "", vllist);
+	_enter("%s,%s", name, vllist);
 
-	down_write(&net->cells_sem);
-	read_lock(&net->cells_lock);
-	list_for_each_entry(cell, &net->cells, link) {
-		if (strncasecmp(cell->name, name, namesz) == 0)
-			goto duplicate_name;
-	}
-	read_unlock(&net->cells_lock);
-
-	cell = afs_cell_alloc(net, name, namesz, vllist);
-	if (IS_ERR(cell)) {
-		_leave(" = %ld", PTR_ERR(cell));
-		up_write(&net->cells_sem);
-		return cell;
+	if (!excl) {
+		rcu_read_lock();
+		cell = afs_lookup_cell_rcu(net, name, namesz);
+		rcu_read_unlock();
+		if (!IS_ERR(cell)) {
+			if (excl) {
+				afs_put_cell(net, cell);
+				return ERR_PTR(-EEXIST);
+			}
+			goto wait_for_cell;
+		}
 	}
 
-	/* add a proc directory for this cell */
-	ret = afs_proc_cell_setup(net, cell);
-	if (ret < 0)
+	/* Assume we're probably going to create a cell and preallocate and
+	 * mostly set up a candidate record.  We can then use this to stash the
+	 * name, the net namespace and VL server addresses.
+	 *
+	 * We also want to do this before we hold any locks as it may involve
+	 * upcalling to userspace to make DNS queries.
+	 */
+	candidate = afs_alloc_cell(net, name, namesz, vllist);
+	if (IS_ERR(candidate)) {
+		_leave(" = %ld", PTR_ERR(candidate));
+		return candidate;
+	}
+
+	/* Find the insertion point and check to see if someone else added a
+	 * cell whilst we were allocating.
+	 */
+	write_seqlock(&net->cells_lock);
+
+	pp = &net->cells.rb_node;
+	parent = NULL;
+	while (*pp) {
+		parent = *pp;
+		cursor = rb_entry(parent, struct afs_cell, net_node);
+
+		n = strncasecmp(cursor->name, name,
+				min_t(size_t, cursor->name_len, namesz));
+		if (n == 0)
+			n = cursor->name_len - namesz;
+		if (n < 0)
+			pp = &(*pp)->rb_left;
+		else if (n > 0)
+			pp = &(*pp)->rb_right;
+		else
+			goto cell_already_exists;
+	}
+
+	cell = candidate;
+	candidate = NULL;
+	rb_link_node_rcu(&cell->net_node, parent, pp);
+	rb_insert_color(&cell->net_node, &net->cells);
+	atomic_inc(&net->cells_outstanding);
+	write_sequnlock(&net->cells_lock);
+
+	queue_work(afs_wq, &cell->manager);
+
+wait_for_cell:
+	_debug("wait_for_cell");
+	ret = wait_on_bit(&cell->flags, AFS_CELL_FL_NOT_READY, TASK_INTERRUPTIBLE);
+	smp_rmb();
+
+	switch (READ_ONCE(cell->state)) {
+	case AFS_CELL_FAILED:
+		ret = cell->error;
 		goto error;
+	default:
+		_debug("weird %u %d", cell->state, cell->error);
+		goto error;
+	case AFS_CELL_ACTIVE:
+		break;
+	}
 
-#ifdef CONFIG_AFS_FSCACHE
-	/* put it up for caching (this never returns an error) */
-	cell->cache = fscache_acquire_cookie(afs_cache_netfs.primary_index,
-					     &afs_cell_cache_index_def,
-					     cell, true);
-#endif
-
-	/* add to the cell lists */
-	write_lock(&net->cells_lock);
-	list_add_tail(&cell->link, &net->cells);
-	write_unlock(&net->cells_lock);
-
-	down_write(&net->proc_cells_sem);
-	list_add_tail(&cell->proc_link, &net->proc_cells);
-	up_write(&net->proc_cells_sem);
-	up_write(&net->cells_sem);
-
-	_leave(" = %p", cell);
+	_leave(" = %p [cell]", cell);
 	return cell;
 
-error:
-	up_write(&net->cells_sem);
-	key_put(cell->anonymous_key);
-	kfree(cell);
-	_leave(" = %d", ret);
-	return ERR_PTR(ret);
-
-duplicate_name:
-	if (retref && !IS_ERR(cell))
-		afs_get_cell(cell);
-
-	read_unlock(&net->cells_lock);
-	up_write(&net->cells_sem);
-
-	if (retref) {
-		_leave(" = %p", cell);
-		return cell;
+cell_already_exists:
+	_debug("cell exists");
+	cell = cursor;
+	if (excl) {
+		ret = -EEXIST;
+	} else {
+		ASSERTCMP(atomic_read(&cursor->usage), >=, 1);
+		afs_get_cell(cursor);
+		ret = 0;
 	}
-
-	_leave(" = -EEXIST");
-	return ERR_PTR(-EEXIST);
+	write_sequnlock(&net->cells_lock);
+	kfree(candidate);
+	if (ret == 0)
+		goto wait_for_cell;
+error:
+	afs_put_cell(net, cell);
+	_leave(" = %d [error]", ret);
+	return ERR_PTR(ret);
 }
 
 /*
@@ -241,10 +344,11 @@ struct afs_cell *afs_cell_create(struct afs_net *net,
  * - can be called with a module parameter string
  * - can be called from a write to /proc/fs/afs/rootcell
  */
-int afs_cell_init(struct afs_net *net, char *rootcell)
+int afs_cell_init(struct afs_net *net, const char *rootcell)
 {
 	struct afs_cell *old_root, *new_root;
-	char *cp;
+	const char *cp, *vllist;
+	size_t len;
 
 	_enter("");
 
@@ -257,180 +361,127 @@ int afs_cell_init(struct afs_net *net, char *rootcell)
 	}
 
 	cp = strchr(rootcell, ':');
-	if (!cp)
+	if (!cp) {
 		_debug("kAFS: no VL server IP addresses specified");
-	else
-		*cp++ = 0;
+		vllist = NULL;
+		len = strlen(rootcell);
+	} else {
+		vllist = cp + 1;
+		len = cp - rootcell;
+	}
 
 	/* allocate a cell record for the root cell */
-	new_root = afs_cell_create(net, rootcell, strlen(rootcell), cp, false);
+	new_root = afs_lookup_cell(net, rootcell, len, vllist, false);
 	if (IS_ERR(new_root)) {
 		_leave(" = %ld", PTR_ERR(new_root));
 		return PTR_ERR(new_root);
 	}
 
+	set_bit(AFS_CELL_FL_NO_GC, &new_root->flags);
+	afs_get_cell(new_root);
+
 	/* install the new cell */
-	write_lock(&net->cells_lock);
+	write_seqlock(&net->cells_lock);
 	old_root = net->ws_cell;
 	net->ws_cell = new_root;
-	write_unlock(&net->cells_lock);
-	afs_put_cell(net, old_root);
+	write_sequnlock(&net->cells_lock);
 
+	afs_put_cell(net, old_root);
 	_leave(" = 0");
 	return 0;
 }
 
 /*
- * lookup a cell record
+ * Update a cell's VL server address list from the DNS.
  */
-struct afs_cell *afs_cell_lookup(struct afs_net *net,
-				 const char *name, unsigned namesz,
-				 bool dns_cell)
+static void afs_update_cell(struct afs_cell *cell)
 {
-	struct afs_cell *cell;
+	time64_t now, expiry;
+	char *vllist = NULL;
+	int ret;
 
-	_enter("\"%*.*s\",", namesz, namesz, name ?: "");
+	_enter("%s", cell->name);
 
-	down_read(&net->cells_sem);
-	read_lock(&net->cells_lock);
+	ret = dns_query("afsdb", cell->name, cell->name_len,
+			"ipv4", &vllist, &expiry);
+	_debug("query %d", ret);
+	switch (ret) {
+	case 0 ... INT_MAX:
+		clear_bit(AFS_CELL_FL_DNS_FAIL, &cell->flags);
+		clear_bit(AFS_CELL_FL_NOT_FOUND, &cell->flags);
+		goto parse_dns_data;
 
-	if (name) {
-		/* if the cell was named, look for it in the cell record list */
-		list_for_each_entry(cell, &net->cells, link) {
-			if (strncmp(cell->name, name, namesz) == 0) {
-				afs_get_cell(cell);
-				goto found;
-			}
-		}
-		cell = ERR_PTR(-ENOENT);
-		if (dns_cell)
-			goto create_cell;
-	found:
-		;
-	} else {
-		cell = net->ws_cell;
-		if (!cell) {
-			/* this should not happen unless user tries to mount
-			 * when root cell is not set. Return an impossibly
-			 * bizarre errno to alert the user. Things like
-			 * ENOENT might be "more appropriate" but they happen
-			 * for other reasons.
-			 */
-			cell = ERR_PTR(-EDESTADDRREQ);
+	case -ENODATA:
+		clear_bit(AFS_CELL_FL_DNS_FAIL, &cell->flags);
+		set_bit(AFS_CELL_FL_NOT_FOUND, &cell->flags);
+		cell->dns_expiry = ktime_get_real_seconds() + 61;
+		cell->error = -EDESTADDRREQ;
+		goto out;
+
+	case -EAGAIN:
+	case -ECONNREFUSED:
+	default:
+		/* Unable to query DNS. */
+		set_bit(AFS_CELL_FL_DNS_FAIL, &cell->flags);
+		cell->dns_expiry = ktime_get_real_seconds() + 10;
+		cell->error = -EDESTADDRREQ;
+		goto out;
+	}
+
+parse_dns_data:
+	write_seqlock(&cell->vl_addrs_lock);
+
+	ret = -EINVAL;
+	do {
+		struct sockaddr_rxrpc *srx = &cell->vl_addrs[cell->vl_naddrs];
+
+		if (in4_pton(vllist, -1,
+			     (u8 *)&srx->transport.sin6.sin6_addr.s6_addr32[3],
+			     ',', (const char **)&vllist)) {
+			srx->transport_len = sizeof(struct sockaddr_in6);
+			srx->transport.sin6.sin6_addr.s6_addr32[0] = 0;
+			srx->transport.sin6.sin6_addr.s6_addr32[1] = 0;
+			srx->transport.sin6.sin6_addr.s6_addr32[2] = htonl(0xffff);
+		} else if (in6_pton(vllist, -1,
+				    srx->transport.sin6.sin6_addr.s6_addr,
+				    ',', (const char **)&vllist)) {
+			srx->transport_len = sizeof(struct sockaddr_in6);
+			srx->transport.sin6.sin6_family	= AF_INET6;
 		} else {
-			afs_get_cell(cell);
+			goto bad_address;
 		}
 
-	}
+		cell->vl_naddrs++;
+		if (!*vllist)
+			break;
+		vllist++;
 
-	read_unlock(&net->cells_lock);
-	up_read(&net->cells_sem);
-	_leave(" = %p", cell);
-	return cell;
+	} while (cell->vl_naddrs < AFS_CELL_MAX_ADDRS);
 
-create_cell:
-	read_unlock(&net->cells_lock);
-	up_read(&net->cells_sem);
+	if (cell->vl_naddrs < AFS_CELL_MAX_ADDRS)
+		memset(cell->vl_addrs + cell->vl_naddrs, 0,
+		       (AFS_CELL_MAX_ADDRS - cell->vl_naddrs) * sizeof(cell->vl_addrs[0]));
 
-	cell = afs_cell_create(net, name, namesz, NULL, true);
-
-	_leave(" = %p", cell);
-	return cell;
-}
-
-#if 0
-/*
- * try and get a cell record
- */
-struct afs_cell *afs_get_cell_maybe(struct afs_cell *cell)
-{
-	write_lock(&net->cells_lock);
-
-	if (cell && !list_empty(&cell->link))
-		afs_get_cell(cell);
-	else
-		cell = NULL;
-
-	write_unlock(&net->cells_lock);
-	return cell;
-}
-#endif  /*  0  */
-
-/*
- * destroy a cell record
- */
-void afs_put_cell(struct afs_net *net, struct afs_cell *cell)
-{
-	if (!cell)
-		return;
-
-	_enter("%p{%d,%s}", cell, atomic_read(&cell->usage), cell->name);
-
-	ASSERTCMP(atomic_read(&cell->usage), >, 0);
-
-	/* to prevent a race, the decrement and the dequeue must be effectively
-	 * atomic */
-	write_lock(&net->cells_lock);
-
-	if (likely(!atomic_dec_and_test(&cell->usage))) {
-		write_unlock(&net->cells_lock);
-		_leave("");
-		return;
-	}
-
-	ASSERT(list_empty(&cell->servers));
-	ASSERT(list_empty(&cell->vl_list));
-
-	wake_up(&net->cells_freeable_wq);
-
-	write_unlock(&net->cells_lock);
-
-	_leave(" [unused]");
+	now = ktime_get_real_seconds();
+	cell->dns_expiry = expiry;
+	afs_set_cell_timer(cell->net, expiry - now);
+bad_address:
+	write_sequnlock(&cell->vl_addrs_lock);
+out:
+	_leave("");
 }
 
 /*
- * destroy a cell record
- * - must be called with the net->cells_sem write-locked
- * - cell->link should have been broken by the caller
+ * Destroy a cell record
  */
-static void afs_cell_destroy(struct afs_net *net, struct afs_cell *cell)
+static void afs_cell_destroy(struct rcu_head *rcu)
 {
-	_enter("%p{%d,%s}", cell, atomic_read(&cell->usage), cell->name);
+	struct afs_cell *cell = container_of(rcu, struct afs_cell, rcu);
 
-	ASSERTCMP(atomic_read(&cell->usage), >=, 0);
-	ASSERT(list_empty(&cell->link));
+	_enter("%p{%s}", cell, cell->name);
 
-	/* wait for everyone to stop using the cell */
-	if (atomic_read(&cell->usage) > 0) {
-		DECLARE_WAITQUEUE(myself, current);
-
-		_debug("wait for cell %s", cell->name);
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		add_wait_queue(&net->cells_freeable_wq, &myself);
-
-		while (atomic_read(&cell->usage) > 0) {
-			schedule();
-			set_current_state(TASK_UNINTERRUPTIBLE);
-		}
-
-		remove_wait_queue(&net->cells_freeable_wq, &myself);
-		set_current_state(TASK_RUNNING);
-	}
-
-	_debug("cell dead");
 	ASSERTCMP(atomic_read(&cell->usage), ==, 0);
-	ASSERT(list_empty(&cell->servers));
-	ASSERT(list_empty(&cell->vl_list));
 
-	afs_proc_cell_remove(net, cell);
-
-	down_write(&net->proc_cells_sem);
-	list_del_init(&cell->proc_link);
-	up_write(&net->proc_cells_sem);
-
-#ifdef CONFIG_AFS_FSCACHE
-	fscache_relinquish_cookie(cell->cache, 0);
-#endif
 	key_put(cell->anonymous_key);
 	kfree(cell);
 
@@ -438,42 +489,343 @@ static void afs_cell_destroy(struct afs_net *net, struct afs_cell *cell)
 }
 
 /*
- * purge in-memory cell database on module unload or afs_init() failure
- * - the timeout daemon is stopped before calling this
+ * Queue the cell manager.
  */
-void afs_cell_purge(struct afs_net *net)
+static void afs_queue_cell_manager(struct afs_net *net)
 {
-	struct afs_cell *cell;
+	int outstanding = atomic_inc_return(&net->cells_outstanding);
+
+	_enter("%d", outstanding);
+
+	if (!queue_work(afs_wq, &net->cells_manager))
+		afs_dec_cells_outstanding(net);
+}
+
+/*
+ * Cell management timer.  We have an increment on cells_outstanding that we
+ * need to pass along to the work item.
+ */
+void afs_cells_timer(struct timer_list *timer)
+{
+	struct afs_net *net = container_of(timer, struct afs_net, cells_timer);
+
+	_enter("");
+	if (!queue_work(afs_wq, &net->cells_manager))
+		afs_dec_cells_outstanding(net);
+}
+
+/*
+ * Drop a reference on a cell record.
+ */
+void afs_put_cell(struct afs_net *net, struct afs_cell *cell)
+{
+	time64_t now, expire_delay;
+
+	if (!cell)
+		return;
+
+	_enter("%s", cell->name);
+
+	now = ktime_get_real_seconds();
+	cell->last_inactive = now;
+	expire_delay = 0;
+	if (!test_bit(AFS_CELL_FL_DNS_FAIL, &cell->flags) &&
+	    !test_bit(AFS_CELL_FL_NOT_FOUND, &cell->flags))
+		expire_delay = afs_cell_gc_delay;
+
+	if (atomic_dec_return(&cell->usage) > 1)
+		return;
+
+	/* 'cell' may now be garbage collected. */
+	afs_set_cell_timer(net, expire_delay);
+}
+
+/*
+ * Allocate a key to use as a placeholder for anonymous user security.
+ */
+static int afs_alloc_anon_key(struct afs_cell *cell)
+{
+	struct key *key;
+	char keyname[4 + AFS_MAXCELLNAME + 1], *cp, *dp;
+
+	/* Create a key to represent an anonymous user. */
+	memcpy(keyname, "afs@", 4);
+	dp = keyname + 4;
+	cp = cell->name;
+	do {
+		*dp++ = tolower(*cp);
+	} while (*cp++);
+
+	key = rxrpc_get_null_key(keyname);
+	if (IS_ERR(key))
+		return PTR_ERR(key);
+
+	cell->anonymous_key = key;
+
+	_debug("anon key %p{%x}",
+	       cell->anonymous_key, key_serial(cell->anonymous_key));
+	return 0;
+}
+
+/*
+ * Activate a cell.
+ */
+static int afs_activate_cell(struct afs_net *net, struct afs_cell *cell)
+{
+	int ret;
+
+	if (!cell->anonymous_key) {
+		ret = afs_alloc_anon_key(cell);
+		if (ret < 0)
+			return ret;
+	}
+
+#ifdef CONFIG_AFS_FSCACHE
+	cell->cache = fscache_acquire_cookie(afs_cache_netfs.primary_index,
+					     &afs_cell_cache_index_def,
+					     cell, true);
+#endif
+	ret = afs_proc_cell_setup(net, cell);
+	if (ret < 0)
+		return ret;
+	spin_lock(&net->proc_cells_lock);
+	list_add_tail(&cell->proc_link, &net->proc_cells);
+	spin_unlock(&net->proc_cells_lock);
+	return 0;
+}
+
+/*
+ * Deactivate a cell.
+ */
+static void afs_deactivate_cell(struct afs_net *net, struct afs_cell *cell)
+{
+	_enter("%s", cell->name);
+
+	afs_proc_cell_remove(net, cell);
+
+	spin_lock(&net->proc_cells_lock);
+	list_del_init(&cell->proc_link);
+	spin_unlock(&net->proc_cells_lock);
+
+#ifdef CONFIG_AFS_FSCACHE
+	fscache_relinquish_cookie(cell->cache, 0);
+	cell->cache = NULL;
+#endif
+
+	_leave("");
+}
+
+/*
+ * Manage a cell record, initialising and destroying it, maintaining its DNS
+ * records.
+ */
+static void afs_manage_cell(struct work_struct *work)
+{
+	struct afs_cell *cell = container_of(work, struct afs_cell, manager);
+	struct afs_net *net = cell->net;
+	bool deleted;
+	int ret, usage;
+
+	_enter("%s", cell->name);
+
+again:
+	_debug("state %u", cell->state);
+	switch (cell->state) {
+	case AFS_CELL_INACTIVE:
+	case AFS_CELL_FAILED:
+		write_seqlock(&net->cells_lock);
+		usage = 1;
+		deleted = atomic_try_cmpxchg_relaxed(&cell->usage, &usage, 0);
+		if (deleted)
+			rb_erase(&cell->net_node, &net->cells);
+		write_sequnlock(&net->cells_lock);
+		if (deleted)
+			goto final_destruction;
+		if (cell->state == AFS_CELL_FAILED)
+			goto done;
+		cell->state = AFS_CELL_UNSET;
+		goto again;
+
+	case AFS_CELL_UNSET:
+		cell->state = AFS_CELL_ACTIVATING;
+		goto again;
+
+	case AFS_CELL_ACTIVATING:
+		ret = afs_activate_cell(net, cell);
+		if (ret < 0)
+			goto activation_failed;
+
+		cell->state = AFS_CELL_ACTIVE;
+		smp_wmb();
+		clear_bit(AFS_CELL_FL_NOT_READY, &cell->flags);
+		wake_up_bit(&cell->flags, AFS_CELL_FL_NOT_READY);
+		goto again;
+
+	case AFS_CELL_ACTIVE:
+		if (atomic_read(&cell->usage) > 1) {
+			time64_t now = ktime_get_real_seconds();
+			if (cell->dns_expiry <= now && net->live)
+				afs_update_cell(cell);
+			goto done;
+		}
+		cell->state = AFS_CELL_DEACTIVATING;
+		goto again;
+
+	case AFS_CELL_DEACTIVATING:
+		set_bit(AFS_CELL_FL_NOT_READY, &cell->flags);
+		if (atomic_read(&cell->usage) > 1)
+			goto reverse_deactivation;
+		afs_deactivate_cell(net, cell);
+		cell->state = AFS_CELL_INACTIVE;
+		goto again;
+
+	default:
+		break;
+	}
+	_debug("bad state %u", cell->state);
+	BUG(); /* Unhandled state */
+
+activation_failed:
+	cell->error = ret;
+	afs_deactivate_cell(net, cell);
+
+	cell->state = AFS_CELL_FAILED;
+	smp_wmb();
+	if (test_and_clear_bit(AFS_CELL_FL_NOT_READY, &cell->flags))
+		wake_up_bit(&cell->flags, AFS_CELL_FL_NOT_READY);
+	goto again;
+
+reverse_deactivation:
+	cell->state = AFS_CELL_ACTIVE;
+	smp_wmb();
+	clear_bit(AFS_CELL_FL_NOT_READY, &cell->flags);
+	wake_up_bit(&cell->flags, AFS_CELL_FL_NOT_READY);
+	_leave(" [deact->act]");
+	return;
+
+done:
+	_leave(" [done %u]", cell->state);
+	return;
+
+final_destruction:
+	call_rcu(&cell->rcu, afs_cell_destroy);
+	afs_dec_cells_outstanding(net);
+	_leave(" [destruct %d]", atomic_read(&net->cells_outstanding));
+}
+
+/*
+ * Manage the records of cells known to a network namespace.  This includes
+ * updating the DNS records and garbage collecting unused cells that were
+ * automatically added.
+ *
+ * Note that constructed cell records may only be removed from net->cells by
+ * this work item, so it is safe for this work item to stash a cursor pointing
+ * into the tree and then return to caller (provided it skips cells that are
+ * still under construction).
+ *
+ * Note also that we were given an increment on net->cells_outstanding by
+ * whoever queued us that we need to deal with before returning.
+ */
+void afs_manage_cells(struct work_struct *work)
+{
+	struct afs_net *net = container_of(work, struct afs_net, cells_manager);
+	struct rb_node *cursor;
+	time64_t now = ktime_get_real_seconds(), next_manage = TIME64_MAX;
+	bool purging = !net->live;
 
 	_enter("");
 
-	afs_put_cell(net, net->ws_cell);
+	/* Trawl the cell database looking for cells that have expired from
+	 * lack of use and cells whose DNS results have expired and dispatch
+	 * their managers.
+	 */
+	read_seqlock_excl(&net->cells_lock);
 
-	down_write(&net->cells_sem);
+	for (cursor = rb_first(&net->cells); cursor; cursor = rb_next(cursor)) {
+		struct afs_cell *cell =
+			rb_entry(cursor, struct afs_cell, net_node);
+		unsigned usage;
+		bool sched_cell = false;
 
-	while (!list_empty(&net->cells)) {
-		cell = NULL;
+		usage = atomic_read(&cell->usage);
+		_debug("manage %s %u", cell->name, usage);
 
-		/* remove the next cell from the front of the list */
-		write_lock(&net->cells_lock);
+		ASSERTCMP(usage, >=, 1);
 
-		if (!list_empty(&net->cells)) {
-			cell = list_entry(net->cells.next,
-					  struct afs_cell, link);
-			list_del_init(&cell->link);
+		if (purging) {
+			if (test_and_clear_bit(AFS_CELL_FL_NO_GC, &cell->flags))
+				usage = atomic_dec_return(&cell->usage);
+			ASSERTCMP(usage, ==, 1);
 		}
 
-		write_unlock(&net->cells_lock);
+		if (usage == 1) {
+			time64_t expire_at = cell->last_inactive;
 
-		if (cell) {
-			_debug("PURGING CELL %s (%d)",
-			       cell->name, atomic_read(&cell->usage));
+			if (!test_bit(AFS_CELL_FL_DNS_FAIL, &cell->flags) &&
+			    !test_bit(AFS_CELL_FL_NOT_FOUND, &cell->flags))
+				expire_at += afs_cell_gc_delay;
+			if (purging || expire_at <= now)
+				sched_cell = true;
+			else if (expire_at < next_manage)
+				next_manage = expire_at;
+		}
 
-			/* now the cell should be left with no references */
-			afs_cell_destroy(net, cell);
+		if (!purging) {
+			if (cell->dns_expiry <= now)
+				sched_cell = true;
+			else if (cell->dns_expiry <= next_manage)
+				next_manage = cell->dns_expiry;
+		}
+
+		if (sched_cell)
+			queue_work(afs_wq, &cell->manager);
+	}
+
+	read_sequnlock_excl(&net->cells_lock);
+
+	/* Update the timer on the way out.  We have to pass an increment on
+	 * cells_outstanding in the namespace that we are in to the timer or
+	 * the work scheduler.
+	 */
+	if (!purging && next_manage < TIME64_MAX) {
+		now = ktime_get_real_seconds();
+
+		if (next_manage - now <= 0) {
+			if (queue_work(afs_wq, &net->cells_manager))
+				atomic_inc(&net->cells_outstanding);
+		} else {
+			afs_set_cell_timer(net, next_manage - now);
 		}
 	}
 
-	up_write(&net->cells_sem);
+	afs_dec_cells_outstanding(net);
+	_leave(" [%d]", atomic_read(&net->cells_outstanding));
+}
+
+/*
+ * Purge in-memory cell database.
+ */
+void afs_cell_purge(struct afs_net *net)
+{
+	struct afs_cell *ws;
+
+	_enter("");
+
+	write_seqlock(&net->cells_lock);
+	ws = net->ws_cell;
+	net->ws_cell = NULL;
+	write_sequnlock(&net->cells_lock);
+	afs_put_cell(net, ws);
+
+	_debug("del timer");
+	if (del_timer_sync(&net->cells_timer))
+		atomic_dec(&net->cells_outstanding);
+
+	_debug("kick mgr");
+	afs_queue_cell_manager(net);
+
+	_debug("wait");
+	wait_on_atomic_t(&net->cells_outstanding, atomic_t_wait,
+			 TASK_UNINTERRUPTIBLE);
 	_leave("");
 }