ceph: cap tracking for async directory operations

Track and correctly handle directory caps for asynchronous operations.
Add aliases for Frc caps that we now designate at Dcu caps (when dealing
with directories).

Unlike file caps, we don't reclaim these when the session goes away, and
instead preemptively release them. In-flight async dirops are instead
handled during reconnect phase. The client needs to re-do a synchronous
operation in order to re-get directory caps.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Reviewed-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c433655..9ffb2ee 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -699,6 +699,7 @@ void ceph_mdsc_release_request(struct kref *kref)
 	struct ceph_mds_request *req = container_of(kref,
 						    struct ceph_mds_request,
 						    r_kref);
+	ceph_mdsc_release_dir_caps(req);
 	destroy_reply_info(&req->r_reply_info);
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
@@ -3280,6 +3281,17 @@ static void handle_session(struct ceph_mds_session *session,
 	return;
 }
 
+void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
+{
+	int dcaps;
+
+	dcaps = xchg(&req->r_dir_caps, 0);
+	if (dcaps) {
+		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
+		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
+	}
+}
+
 /*
  * called under session->mutex.
  */
@@ -3307,9 +3319,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 			continue;
 		if (req->r_attempts == 0)
 			continue; /* only old requests */
-		if (req->r_session &&
-		    req->r_session->s_mds == session->s_mds)
-			__send_request(mdsc, session, req, true);
+		if (!req->r_session)
+			continue;
+		if (req->r_session->s_mds != session->s_mds)
+			continue;
+
+		ceph_mdsc_release_dir_caps(req);
+
+		__send_request(mdsc, session, req, true);
 	}
 	mutex_unlock(&mdsc->mutex);
 }
@@ -3393,7 +3410,7 @@ static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
-static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
+static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			  void *arg)
 {
 	union {
@@ -3416,6 +3433,10 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	cap->mseq = 0;       /* and migrate_seq */
 	cap->cap_gen = cap->session->s_cap_gen;
 
+	/* These are lost when the session goes away */
+	if (S_ISDIR(inode->i_mode))
+		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
+
 	if (recon_state->msg_version >= 2) {
 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -3712,7 +3733,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 		recon_state.msg_version = 2;
 	}
 	/* trsaverse this session's caps */
-	err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
+	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
 
 	spin_lock(&session->s_cap_lock);
 	session->s_cap_reconnect = 0;