ceph: unify cap flush and snapcap flush

This patch includes following changes
- Assign flush tid to snapcap flush
- Remove session's s_cap_snaps_flushing list. Add inode to session's
  s_cap_flushing list instead. Inode is removed from the list when
  there is no pending snapcap flush or cap flush.
- make __kick_flushing_caps() re-send both snapcap flushes and cap
  flushes.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index e0efa75..0ac6047 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -40,6 +40,7 @@
  * cluster to release server state.
  */
 
+static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
 
 /*
  * Generate readable cap strings for debugging output.
@@ -1217,6 +1218,22 @@
 	return delayed;
 }
 
+static inline int __send_flush_snap(struct inode *inode,
+				    struct ceph_mds_session *session,
+				    struct ceph_cap_snap *capsnap,
+				    u32 mseq, u64 oldest_flush_tid)
+{
+	return send_cap_msg(session, ceph_vino(inode).ino, 0,
+			CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
+			capsnap->dirty, 0, capsnap->cap_flush.tid,
+			oldest_flush_tid, 0, mseq, capsnap->size, 0,
+			&capsnap->mtime, &capsnap->atime,
+			&capsnap->ctime, capsnap->time_warp_seq,
+			capsnap->uid, capsnap->gid, capsnap->mode,
+			capsnap->xattr_version, capsnap->xattr_blob,
+			capsnap->follows, capsnap->inline_data);
+}
+
 /*
  * When a snapshot is taken, clients accumulate dirty metadata on
  * inodes with capabilities in ceph_cap_snaps to describe the file
@@ -1224,14 +1241,10 @@
  * asynchronously back to the MDS once sync writes complete and dirty
  * data is written out.
  *
- * Unless @kick is true, skip cap_snaps that were already sent to
- * the MDS (i.e., during this session).
- *
  * Called under i_ceph_lock.  Takes s_mutex as needed.
  */
 void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			struct ceph_mds_session **psession,
-			int kick)
+			struct ceph_mds_session **psession)
 		__releases(ci->i_ceph_lock)
 		__acquires(ci->i_ceph_lock)
 {
@@ -1242,6 +1255,7 @@
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
 						    session->s_mutex */
+	u64 oldest_flush_tid;
 	u64 next_follows = 0;  /* keep track of how far we've gotten through the
 			     i_cap_snaps list, and skip these entries next time
 			     around to avoid an infinite loop */
@@ -1272,7 +1286,7 @@
 		}
 
 		/* only flush each capsnap once */
-		if (!kick && !list_empty(&capsnap->flushing_item)) {
+		if (capsnap->cap_flush.tid > 0) {
 			dout("already flushed %p, skipping\n", capsnap);
 			continue;
 		}
@@ -1282,8 +1296,6 @@
 
 		if (session && session->s_mds != mds) {
 			dout("oops, wrong session %p mutex\n", session);
-			if (kick)
-				goto out;
 
 			mutex_unlock(&session->s_mutex);
 			ceph_put_mds_session(session);
@@ -1309,26 +1321,27 @@
 		}
 
 		spin_lock(&mdsc->cap_dirty_lock);
-		capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
+		capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
+		list_add_tail(&capsnap->cap_flush.g_list,
+			      &mdsc->cap_flush_list);
+		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+
+		if (list_empty(&ci->i_flushing_item)) {
+			list_add_tail(&ci->i_flushing_item,
+				      &session->s_cap_flushing);
+		}
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		list_add_tail(&capsnap->cap_flush.i_list,
+			      &ci->i_cap_flush_list);
+
 		atomic_inc(&capsnap->nref);
-		if (list_empty(&capsnap->flushing_item))
-			list_add_tail(&capsnap->flushing_item,
-				      &session->s_cap_snaps_flushing);
 		spin_unlock(&ci->i_ceph_lock);
 
 		dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
-		     inode, capsnap, capsnap->follows, capsnap->flush_tid);
-		send_cap_msg(session, ceph_vino(inode).ino, 0,
-			     CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
-			     capsnap->dirty, 0, capsnap->flush_tid, 0,
-			     0, mseq, capsnap->size, 0,
-			     &capsnap->mtime, &capsnap->atime,
-			     &capsnap->ctime, capsnap->time_warp_seq,
-			     capsnap->uid, capsnap->gid, capsnap->mode,
-			     capsnap->xattr_version, capsnap->xattr_blob,
-			     capsnap->follows, capsnap->inline_data);
+		     inode, capsnap, capsnap->follows, capsnap->cap_flush.tid);
+		__send_flush_snap(inode, session, capsnap, mseq,
+				  oldest_flush_tid);
 
 		next_follows = capsnap->follows + 1;
 		ceph_put_cap_snap(capsnap);
@@ -1354,7 +1367,7 @@
 static void ceph_flush_snaps(struct ceph_inode_info *ci)
 {
 	spin_lock(&ci->i_ceph_lock);
-	__ceph_flush_snaps(ci, NULL, 0);
+	__ceph_flush_snaps(ci, NULL);
 	spin_unlock(&ci->i_ceph_lock);
 }
 
@@ -1476,11 +1489,6 @@
 	if (list_empty(&ci->i_flushing_item)) {
 		list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
 		mdsc->num_cap_flushing++;
-		dout(" inode %p now flushing tid %llu\n", inode, cf->tid);
-	} else {
-		list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
-		dout(" inode %p now flushing (more) tid %llu\n",
-		     inode, cf->tid);
 	}
 	spin_unlock(&mdsc->cap_dirty_lock);
 
@@ -1556,7 +1564,7 @@
 
 	/* flush snaps first time around only */
 	if (!list_empty(&ci->i_cap_snaps))
-		__ceph_flush_snaps(ci, &session, 0);
+		__ceph_flush_snaps(ci, &session);
 	goto retry_locked;
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1997,80 +2005,74 @@
 	return err;
 }
 
-/*
- * After a recovering MDS goes active, we need to resend any caps
- * we were flushing.
- *
- * Caller holds session->s_mutex.
- */
-static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
-				   struct ceph_mds_session *session)
-{
-	struct ceph_cap_snap *capsnap;
-
-	dout("kick_flushing_capsnaps mds%d\n", session->s_mds);
-	list_for_each_entry(capsnap, &session->s_cap_snaps_flushing,
-			    flushing_item) {
-		struct ceph_inode_info *ci = capsnap->ci;
-		struct inode *inode = &ci->vfs_inode;
-		struct ceph_cap *cap;
-
-		spin_lock(&ci->i_ceph_lock);
-		cap = ci->i_auth_cap;
-		if (cap && cap->session == session) {
-			dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
-			     cap, capsnap);
-			__ceph_flush_snaps(ci, &session, 1);
-		} else {
-			pr_err("%p auth cap %p not mds%d ???\n", inode,
-			       cap, session->s_mds);
-		}
-		spin_unlock(&ci->i_ceph_lock);
-	}
-}
-
-static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
-				struct ceph_mds_session *session,
-				struct ceph_inode_info *ci)
+static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
+				 struct ceph_mds_session *session,
+				 struct ceph_inode_info *ci,
+				 u64 oldest_flush_tid)
+	__releases(ci->i_ceph_lock)
+	__acquires(ci->i_ceph_lock)
 {
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap *cap;
 	struct ceph_cap_flush *cf;
-	int delayed = 0;
+	int ret;
 	u64 first_tid = 0;
-	u64 oldest_flush_tid;
 
-	spin_lock(&mdsc->cap_dirty_lock);
-	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
-	spin_unlock(&mdsc->cap_dirty_lock);
-
-	spin_lock(&ci->i_ceph_lock);
 	list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
 		if (cf->tid < first_tid)
 			continue;
 
 		cap = ci->i_auth_cap;
 		if (!(cap && cap->session == session)) {
-			pr_err("%p auth cap %p not mds%d ???\n", inode,
-					cap, session->s_mds);
-			spin_unlock(&ci->i_ceph_lock);
+			pr_err("%p auth cap %p not mds%d ???\n",
+			       inode, cap, session->s_mds);
 			break;
 		}
 
 		first_tid = cf->tid + 1;
 
-		dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
-		     cap, cf->tid, ceph_cap_string(cf->caps));
-		delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-				      __ceph_caps_used(ci),
-				      __ceph_caps_wanted(ci),
-				      cap->issued | cap->implemented,
-				      cf->caps, cf->tid, oldest_flush_tid);
+		if (cf->caps) {
+			dout("kick_flushing_caps %p cap %p tid %llu %s\n",
+			     inode, cap, cf->tid, ceph_cap_string(cf->caps));
+			ci->i_ceph_flags |= CEPH_I_NODELAY;
+			ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+					  __ceph_caps_used(ci),
+					  __ceph_caps_wanted(ci),
+					  cap->issued | cap->implemented,
+					  cf->caps, cf->tid, oldest_flush_tid);
+			if (ret) {
+				pr_err("kick_flushing_caps: error sending "
+					"cap flush, ino (%llx.%llx) "
+					"tid %llu flushing %s\n",
+					ceph_vinop(inode), cf->tid,
+					ceph_cap_string(cf->caps));
+			}
+		} else {
+			struct ceph_cap_snap *capsnap =
+					container_of(cf, struct ceph_cap_snap,
+						    cap_flush);
+			dout("kick_flushing_caps %p capsnap %p tid %llu %s\n",
+			     inode, capsnap, cf->tid,
+			     ceph_cap_string(capsnap->dirty));
+
+			atomic_inc(&capsnap->nref);
+			spin_unlock(&ci->i_ceph_lock);
+
+			ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
+						oldest_flush_tid);
+			if (ret < 0) {
+				pr_err("kick_flushing_caps: error sending "
+					"cap flushsnap, ino (%llx.%llx) "
+					"tid %llu follows %llu\n",
+					ceph_vinop(inode), cf->tid,
+					capsnap->follows);
+			}
+
+			ceph_put_cap_snap(capsnap);
+		}
 
 		spin_lock(&ci->i_ceph_lock);
 	}
-	spin_unlock(&ci->i_ceph_lock);
-	return delayed;
 }
 
 void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
@@ -2078,8 +2080,14 @@
 {
 	struct ceph_inode_info *ci;
 	struct ceph_cap *cap;
+	u64 oldest_flush_tid;
 
 	dout("early_kick_flushing_caps mds%d\n", session->s_mds);
+
+	spin_lock(&mdsc->cap_dirty_lock);
+	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+	spin_unlock(&mdsc->cap_dirty_lock);
+
 	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
 		spin_lock(&ci->i_ceph_lock);
 		cap = ci->i_auth_cap;
@@ -2099,10 +2107,8 @@
 		 */
 		if ((cap->issued & ci->i_flushing_caps) !=
 		    ci->i_flushing_caps) {
-			spin_unlock(&ci->i_ceph_lock);
-			if (!__kick_flushing_caps(mdsc, session, ci))
-				continue;
-			spin_lock(&ci->i_ceph_lock);
+			__kick_flushing_caps(mdsc, session, ci,
+					     oldest_flush_tid);
 		}
 
 		spin_unlock(&ci->i_ceph_lock);
@@ -2113,50 +2119,43 @@
 			     struct ceph_mds_session *session)
 {
 	struct ceph_inode_info *ci;
-
-	kick_flushing_capsnaps(mdsc, session);
+	u64 oldest_flush_tid;
 
 	dout("kick_flushing_caps mds%d\n", session->s_mds);
+
+	spin_lock(&mdsc->cap_dirty_lock);
+	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+	spin_unlock(&mdsc->cap_dirty_lock);
+
 	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
-		int delayed = __kick_flushing_caps(mdsc, session, ci);
-		if (delayed) {
-			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci);
-			spin_unlock(&ci->i_ceph_lock);
-		}
+		spin_lock(&ci->i_ceph_lock);
+		__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
+		spin_unlock(&ci->i_ceph_lock);
 	}
 }
 
 static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
 				     struct ceph_mds_session *session,
 				     struct inode *inode)
+	__releases(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_cap *cap;
 
-	spin_lock(&ci->i_ceph_lock);
 	cap = ci->i_auth_cap;
 	dout("kick_flushing_inode_caps %p flushing %s\n", inode,
 	     ceph_cap_string(ci->i_flushing_caps));
 
-	__ceph_flush_snaps(ci, &session, 1);
-
-	if (ci->i_flushing_caps) {
-		int delayed;
-
+	if (!list_empty(&ci->i_cap_flush_list)) {
+		u64 oldest_flush_tid;
 		spin_lock(&mdsc->cap_dirty_lock);
 		list_move_tail(&ci->i_flushing_item,
 			       &cap->session->s_cap_flushing);
+		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
 		spin_unlock(&ci->i_ceph_lock);
-
-		delayed = __kick_flushing_caps(mdsc, session, ci);
-		if (delayed) {
-			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci);
-			spin_unlock(&ci->i_ceph_lock);
-		}
 	} else {
 		spin_unlock(&ci->i_ceph_lock);
 	}
@@ -2487,12 +2486,11 @@
 {
 	if (!capsnap->need_flush &&
 	    !capsnap->writing && !capsnap->dirty_pages) {
-
 		dout("dropping cap_snap %p follows %llu\n",
 		     capsnap, capsnap->follows);
+		BUG_ON(capsnap->cap_flush.tid > 0);
 		ceph_put_snap_context(capsnap->context);
 		list_del(&capsnap->ci_item);
-		list_del(&capsnap->flushing_item);
 		ceph_put_cap_snap(capsnap);
 		return 1;
 	}
@@ -2891,13 +2889,13 @@
 			fill_inline = true;
 	}
 
-	spin_unlock(&ci->i_ceph_lock);
-
 	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
-		kick_flushing_inode_caps(mdsc, session, inode);
-		up_read(&mdsc->snap_rwsem);
 		if (newcaps & ~issued)
 			wake = true;
+		kick_flushing_inode_caps(mdsc, session, inode);
+		up_read(&mdsc->snap_rwsem);
+	} else {
+		spin_unlock(&ci->i_ceph_lock);
 	}
 
 	if (fill_inline)
@@ -2951,6 +2949,8 @@
 	list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
 		if (cf->tid == flush_tid)
 			cleaned = cf->caps;
+		if (cf->caps == 0) /* capsnap */
+			continue;
 		if (cf->tid <= flush_tid) {
 			list_del(&cf->i_list);
 			list_add_tail(&cf->i_list, &to_remove);
@@ -2985,13 +2985,16 @@
 	}
 
 	if (ci->i_flushing_caps == 0) {
-		list_del_init(&ci->i_flushing_item);
-		if (!list_empty(&session->s_cap_flushing))
-			dout(" mds%d still flushing cap on %p\n",
-			     session->s_mds,
-			     &list_entry(session->s_cap_flushing.next,
-					 struct ceph_inode_info,
-					 i_flushing_item)->vfs_inode);
+		if (list_empty(&ci->i_cap_flush_list)) {
+			list_del_init(&ci->i_flushing_item);
+			if (!list_empty(&session->s_cap_flushing)) {
+				dout(" mds%d still flushing cap on %p\n",
+				     session->s_mds,
+				     &list_first_entry(&session->s_cap_flushing,
+						struct ceph_inode_info,
+						i_flushing_item)->vfs_inode);
+			}
+		}
 		mdsc->num_cap_flushing--;
 		dout(" inode %p now !flushing\n", inode);
 
@@ -3039,7 +3042,7 @@
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	u64 follows = le64_to_cpu(m->snap_follows);
 	struct ceph_cap_snap *capsnap;
-	int drop = 0;
+	int flushed = 0;
 
 	dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
 	     inode, ci, session->s_mds, follows);
@@ -3047,30 +3050,47 @@
 	spin_lock(&ci->i_ceph_lock);
 	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 		if (capsnap->follows == follows) {
-			if (capsnap->flush_tid != flush_tid) {
+			if (capsnap->cap_flush.tid != flush_tid) {
 				dout(" cap_snap %p follows %lld tid %lld !="
 				     " %lld\n", capsnap, follows,
-				     flush_tid, capsnap->flush_tid);
+				     flush_tid, capsnap->cap_flush.tid);
 				break;
 			}
-			WARN_ON(capsnap->dirty_pages || capsnap->writing);
-			dout(" removing %p cap_snap %p follows %lld\n",
-			     inode, capsnap, follows);
-			ceph_put_snap_context(capsnap->context);
-			list_del(&capsnap->ci_item);
-			list_del(&capsnap->flushing_item);
-			ceph_put_cap_snap(capsnap);
-			wake_up_all(&mdsc->cap_flushing_wq);
-			drop = 1;
+			flushed = 1;
 			break;
 		} else {
 			dout(" skipping cap_snap %p follows %lld\n",
 			     capsnap, capsnap->follows);
 		}
 	}
+	if (flushed) {
+		u64 oldest_flush_tid;
+		WARN_ON(capsnap->dirty_pages || capsnap->writing);
+		dout(" removing %p cap_snap %p follows %lld\n",
+		     inode, capsnap, follows);
+		list_del(&capsnap->ci_item);
+		list_del(&capsnap->cap_flush.i_list);
+
+		spin_lock(&mdsc->cap_dirty_lock);
+
+		if (list_empty(&ci->i_cap_flush_list))
+			list_del_init(&ci->i_flushing_item);
+
+		list_del(&capsnap->cap_flush.g_list);
+
+		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+		if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid)
+			wake_up_all(&mdsc->cap_flushing_wq);
+
+		spin_unlock(&mdsc->cap_dirty_lock);
+		wake_up_all(&ci->i_cap_wq);
+	}
 	spin_unlock(&ci->i_ceph_lock);
-	if (drop)
+	if (flushed) {
+		ceph_put_snap_context(capsnap->context);
+		ceph_put_cap_snap(capsnap);
 		iput(inode);
+	}
 }
 
 /*
@@ -3175,7 +3195,8 @@
 			tcap->implemented |= issued;
 			if (cap == ci->i_auth_cap)
 				ci->i_auth_cap = tcap;
-			if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
+			if (!list_empty(&ci->i_cap_flush_list) &&
+			    ci->i_auth_cap == tcap) {
 				spin_lock(&mdsc->cap_dirty_lock);
 				list_move_tail(&ci->i_flushing_item,
 					       &tcap->session->s_cap_flushing);