afs: Overhaul the callback handling

Overhaul the AFS callback handling by the following means:

 (1) Don't give up callback promises on vnodes that we are no longer using,
     rather let them just expire on the server or let the server break
     them.  This is actually more efficient for the server as the callback
     lookup is expensive if there are lots of extant callbacks.

 (2) Only give up the callback promises we have from a server when the
     server record is destroyed.  Then we can just give up *all* the
     callback promises on it in one go.

 (3) Servers can end up being shared between cells if cells are aliased, so
     don't add all the vnodes being backed by a particular server into a
     big FID-indexed tree on that server as there may be duplicates.

     Instead have each volume instance (~= superblock) register an interest
     in a server as it starts to make use of it and use this to allow the
     processor for callbacks from the server to find the superblock and
     thence the inode corresponding to the FID being broken by means of
     ilookup_nowait().

 (4) Rather than iterating over the entire callback list when a mass-break
     comes in from the server, maintain a counter of mass-breaks in
     afs_server (cb_seq) and make afs_validate() check it against the copy
     in afs_vnode.

     It would be nice not to have to take a read_lock whilst doing this,
     but that's tricky without using RCU.

 (5) Save a ref on the fileserver we're using for a call in the afs_call
     struct so that we can access its cb_s_break during call decoding.

 (6) Write-lock around callback and status storage in a vnode and read-lock
     around getattr so that we don't see the status mid-update.

This has the following consequences:

 (1) Data invalidation isn't seen until someone calls afs_validate() on a
     vnode.  Unfortunately, we need to use a key to query the server, but
     getting one from a background thread is tricky without caching loads
     of keys all over the place.

 (2) Mass invalidation isn't seen until someone calls afs_validate().

 (3) Callback breaking is going to hit the inode_hash_lock quite a bit.
     Could this be replaced with rcu_read_lock() since inodes are destroyed
     under RCU conditions.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/fs/afs/fsclient.c b/fs/afs/fsclient.c
index 36f58ad..c665840 100644
--- a/fs/afs/fsclient.c
+++ b/fs/afs/fsclient.c
@@ -22,6 +22,11 @@
  */
 static u8 afs_discard_buffer[64];
 
+static inline void afs_use_fs_server(struct afs_call *call, struct afs_server *server)
+{
+	call->server = afs_get_server(server);
+}
+
 /*
  * decode an AFSFid block
  */
@@ -47,14 +52,17 @@ static void xdr_decode_AFSFetchStatus(const __be32 **_bp,
 	const __be32 *bp = *_bp;
 	umode_t mode;
 	u64 data_version, size;
-	u32 changed = 0; /* becomes non-zero if ctime-type changes seen */
+	bool changed = false;
 	kuid_t owner;
 	kgid_t group;
 
+	write_seqlock(&vnode->cb_lock);
+
 #define EXTRACT(DST)				\
 	do {					\
 		u32 x = ntohl(*bp++);		\
-		changed |= DST - x;		\
+		if (DST != x)			\
+			changed |= true;	\
 		DST = x;			\
 	} while (0)
 
@@ -127,25 +135,39 @@ static void xdr_decode_AFSFetchStatus(const __be32 **_bp,
 			_debug("vnode modified %llx on {%x:%u}",
 			       (unsigned long long) data_version,
 			       vnode->fid.vid, vnode->fid.vnode);
-			set_bit(AFS_VNODE_MODIFIED, &vnode->flags);
+			set_bit(AFS_VNODE_DIR_MODIFIED, &vnode->flags);
 			set_bit(AFS_VNODE_ZAP_DATA, &vnode->flags);
 		}
 	} else if (store_version) {
 		status->data_version = data_version;
 	}
+
+	write_sequnlock(&vnode->cb_lock);
 }
 
 /*
  * decode an AFSCallBack block
  */
-static void xdr_decode_AFSCallBack(const __be32 **_bp, struct afs_vnode *vnode)
+static void xdr_decode_AFSCallBack(struct afs_call *call,
+				   struct afs_vnode *vnode,
+				   const __be32 **_bp)
 {
 	const __be32 *bp = *_bp;
+	u32 cb_expiry;
 
-	vnode->cb_version	= ntohl(*bp++);
-	vnode->cb_expiry	= ntohl(*bp++);
-	vnode->cb_type		= ntohl(*bp++);
-	vnode->cb_expires	= vnode->cb_expiry + ktime_get_real_seconds();
+	write_seqlock(&vnode->cb_lock);
+
+	if (call->cb_break == (vnode->cb_break + call->server->cb_s_break)) {
+		vnode->cb_version	= ntohl(*bp++);
+		cb_expiry		= ntohl(*bp++);
+		vnode->cb_type		= ntohl(*bp++);
+		vnode->cb_expires_at	= cb_expiry + ktime_get_real_seconds();
+		set_bit(AFS_VNODE_CB_PROMISED, &vnode->flags);
+	} else {
+		bp += 3;
+	}
+
+	write_sequnlock(&vnode->cb_lock);
 	*_bp = bp;
 }
 
@@ -247,16 +269,16 @@ static int afs_deliver_fs_fetch_status(struct afs_call *call)
 	const __be32 *bp;
 	int ret;
 
-	_enter("");
-
 	ret = afs_transfer_reply(call);
 	if (ret < 0)
 		return ret;
 
+	_enter("{%x:%u}", vnode->fid.vid, vnode->fid.vnode);
+
 	/* unmarshall the reply once we've received all of it */
 	bp = call->buffer;
 	xdr_decode_AFSFetchStatus(&bp, &vnode->status, vnode, NULL);
-	xdr_decode_AFSCallBack(&bp, vnode);
+	xdr_decode_AFSCallBack(call, vnode, &bp);
 	if (call->reply[1])
 		xdr_decode_AFSVolSync(&bp, call->reply[1]);
 
@@ -304,6 +326,8 @@ int afs_fs_fetch_file_status(struct afs_server *server,
 	bp[2] = htonl(vnode->fid.vnode);
 	bp[3] = htonl(vnode->fid.unique);
 
+	call->cb_break = vnode->cb_break + server->cb_s_break;
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -429,7 +453,7 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call)
 
 		bp = call->buffer;
 		xdr_decode_AFSFetchStatus(&bp, &vnode->status, vnode, NULL);
-		xdr_decode_AFSCallBack(&bp, vnode);
+		xdr_decode_AFSCallBack(call, vnode, &bp);
 		if (call->reply[1])
 			xdr_decode_AFSVolSync(&bp, call->reply[1]);
 
@@ -513,6 +537,8 @@ static int afs_fs_fetch_data64(struct afs_server *server,
 	bp[7] = htonl(lower_32_bits(req->len));
 
 	atomic_inc(&req->usage);
+	call->cb_break = vnode->cb_break + server->cb_s_break;
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -556,87 +582,8 @@ int afs_fs_fetch_data(struct afs_server *server,
 	bp[5] = htonl(lower_32_bits(req->len));
 
 	atomic_inc(&req->usage);
-	return afs_make_call(&server->addr, call, GFP_NOFS, async);
-}
-
-/*
- * deliver reply data to an FS.GiveUpCallBacks
- */
-static int afs_deliver_fs_give_up_callbacks(struct afs_call *call)
-{
-	_enter("");
-
-	/* shouldn't be any reply data */
-	return afs_extract_data(call, NULL, 0, false);
-}
-
-/*
- * FS.GiveUpCallBacks operation type
- */
-static const struct afs_call_type afs_RXFSGiveUpCallBacks = {
-	.name		= "FS.GiveUpCallBacks",
-	.deliver	= afs_deliver_fs_give_up_callbacks,
-	.destructor	= afs_flat_call_destructor,
-};
-
-/*
- * give up a set of callbacks
- * - the callbacks are held in the server->cb_break ring
- */
-int afs_fs_give_up_callbacks(struct afs_net *net,
-			     struct afs_server *server,
-			     bool async)
-{
-	struct afs_call *call;
-	size_t ncallbacks;
-	__be32 *bp, *tp;
-	int loop;
-
-	ncallbacks = CIRC_CNT(server->cb_break_head, server->cb_break_tail,
-			      ARRAY_SIZE(server->cb_break));
-
-	_enter("{%zu},", ncallbacks);
-
-	if (ncallbacks == 0)
-		return 0;
-	if (ncallbacks > AFSCBMAX)
-		ncallbacks = AFSCBMAX;
-
-	_debug("break %zu callbacks", ncallbacks);
-
-	call = afs_alloc_flat_call(net, &afs_RXFSGiveUpCallBacks,
-				   12 + ncallbacks * 6 * 4, 0);
-	if (!call)
-		return -ENOMEM;
-
-
-	/* marshall the parameters */
-	bp = call->request;
-	tp = bp + 2 + ncallbacks * 3;
-	*bp++ = htonl(FSGIVEUPCALLBACKS);
-	*bp++ = htonl(ncallbacks);
-	*tp++ = htonl(ncallbacks);
-
-	atomic_sub(ncallbacks, &server->cb_break_n);
-	for (loop = ncallbacks; loop > 0; loop--) {
-		struct afs_callback *cb =
-			&server->cb_break[server->cb_break_tail];
-
-		*bp++ = htonl(cb->fid.vid);
-		*bp++ = htonl(cb->fid.vnode);
-		*bp++ = htonl(cb->fid.unique);
-		*tp++ = htonl(cb->version);
-		*tp++ = htonl(cb->expiry);
-		*tp++ = htonl(cb->type);
-		smp_mb();
-		server->cb_break_tail =
-			(server->cb_break_tail + 1) &
-			(ARRAY_SIZE(server->cb_break) - 1);
-	}
-
-	ASSERT(ncallbacks > 0);
-	wake_up_nr(&server->cb_break_waitq, ncallbacks);
-
+	call->cb_break = vnode->cb_break + server->cb_s_break;
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -731,6 +678,7 @@ int afs_fs_create(struct afs_server *server,
 	*bp++ = htonl(mode & S_IALLUGO); /* unix mode */
 	*bp++ = 0; /* segment size */
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -809,6 +757,7 @@ int afs_fs_remove(struct afs_server *server,
 		bp = (void *) bp + padsz;
 	}
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -892,6 +841,7 @@ int afs_fs_link(struct afs_server *server,
 	*bp++ = htonl(vnode->fid.vnode);
 	*bp++ = htonl(vnode->fid.unique);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -994,6 +944,7 @@ int afs_fs_symlink(struct afs_server *server,
 	*bp++ = htonl(S_IRWXUGO); /* unix mode */
 	*bp++ = 0; /* segment size */
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1095,6 +1046,7 @@ int afs_fs_rename(struct afs_server *server,
 		bp = (void *) bp + n_padsz;
 	}
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1196,6 +1148,7 @@ static int afs_fs_store_data64(struct afs_server *server,
 	*bp++ = htonl(i_size >> 32);
 	*bp++ = htonl((u32) i_size);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1269,6 +1222,7 @@ int afs_fs_store_data(struct afs_server *server, struct afs_writeback *wb,
 	*bp++ = htonl(size);
 	*bp++ = htonl(i_size);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1366,6 +1320,7 @@ static int afs_fs_setattr_size64(struct afs_server *server, struct key *key,
 	*bp++ = htonl(attr->ia_size >> 32);	/* new file length */
 	*bp++ = htonl((u32) attr->ia_size);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1413,6 +1368,7 @@ static int afs_fs_setattr_size(struct afs_server *server, struct key *key,
 	*bp++ = 0;				/* size of write */
 	*bp++ = htonl(attr->ia_size);		/* new file length */
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1454,6 +1410,7 @@ int afs_fs_setattr(struct afs_server *server, struct key *key,
 
 	xdr_encode_AFS_StoreStatus(&bp, attr);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1684,6 +1641,7 @@ int afs_fs_get_volume_status(struct afs_server *server,
 	bp[0] = htonl(FSGETVOLUMESTATUS);
 	bp[1] = htonl(vnode->fid.vid);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1766,6 +1724,7 @@ int afs_fs_set_lock(struct afs_server *server,
 	*bp++ = htonl(vnode->fid.unique);
 	*bp++ = htonl(type);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1797,6 +1756,7 @@ int afs_fs_extend_lock(struct afs_server *server,
 	*bp++ = htonl(vnode->fid.vnode);
 	*bp++ = htonl(vnode->fid.unique);
 
+	afs_use_fs_server(call, server);
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }
 
@@ -1828,5 +1788,49 @@ int afs_fs_release_lock(struct afs_server *server,
 	*bp++ = htonl(vnode->fid.vnode);
 	*bp++ = htonl(vnode->fid.unique);
 
+	afs_use_fs_server(call, server);
+	return afs_make_call(&server->addr, call, GFP_NOFS, async);
+}
+
+/*
+ * Deliver reply data to an FS.GiveUpAllCallBacks operation.
+ */
+static int afs_deliver_fs_give_up_all_callbacks(struct afs_call *call)
+{
+	return afs_transfer_reply(call);
+}
+
+/*
+ * FS.GiveUpAllCallBacks operation type
+ */
+static const struct afs_call_type afs_RXFSGiveUpAllCallBacks = {
+	.name		= "FS.GiveUpAllCallBacks",
+	.deliver	= afs_deliver_fs_give_up_all_callbacks,
+	.destructor	= afs_flat_call_destructor,
+};
+
+/*
+ * Flush all the callbacks we have on a server.
+ */
+int afs_fs_give_up_all_callbacks(struct afs_server *server,
+				 struct key *key,
+				 bool async)
+{
+	struct afs_call *call;
+	__be32 *bp;
+
+	_enter("");
+
+	call = afs_alloc_flat_call(server->net, &afs_RXFSGiveUpAllCallBacks, 2 * 4, 0);
+	if (!call)
+		return -ENOMEM;
+
+	call->key = key;
+
+	/* marshall the parameters */
+	bp = call->request;
+	*bp++ = htonl(FSGIVEUPALLCALLBACKS);
+
+	/* Can't take a ref on server */
 	return afs_make_call(&server->addr, call, GFP_NOFS, async);
 }