ceph: pre-allocate data structure that tracks caps flushing

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5f53ac0..7edf3c4 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1308,12 +1308,17 @@
 	struct inode *inode = file_inode(vma->vm_file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_file_info *fi = vma->vm_file->private_data;
+	struct ceph_cap_flush *prealloc_cf;
 	struct page *page = vmf->page;
 	loff_t off = page_offset(page);
 	loff_t size = i_size_read(inode);
 	size_t len;
 	int want, got, ret;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return VM_FAULT_SIGBUS;
+
 	if (ci->i_inline_version != CEPH_INLINE_NONE) {
 		struct page *locked_page = NULL;
 		if (off == 0) {
@@ -1323,8 +1328,10 @@
 		ret = ceph_uninline_data(vma->vm_file, locked_page);
 		if (locked_page)
 			unlock_page(locked_page);
-		if (ret < 0)
-			return VM_FAULT_SIGBUS;
+		if (ret < 0) {
+			ret = VM_FAULT_SIGBUS;
+			goto out_free;
+		}
 	}
 
 	if (off + PAGE_CACHE_SIZE <= size)
@@ -1346,7 +1353,8 @@
 			break;
 		if (ret != -ERESTARTSYS) {
 			WARN_ON(1);
-			return VM_FAULT_SIGBUS;
+			ret = VM_FAULT_SIGBUS;
+			goto out_free;
 		}
 	}
 	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
@@ -1381,7 +1389,8 @@
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1390,6 +1399,8 @@
 	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
 	     inode, off, len, ceph_cap_string(got), ret);
 	ceph_put_cap_refs(ci, got);
+out_free:
+	ceph_free_cap_flush(prealloc_cf);
 
 	return ret;
 }
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 69a16044..dd7b20a 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1356,7 +1356,8 @@
  * Caller is then responsible for calling __mark_inode_dirty with the
  * returned flags value.
  */
-int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
+int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
+			   struct ceph_cap_flush **pcf)
 {
 	struct ceph_mds_client *mdsc =
 		ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
@@ -1376,6 +1377,9 @@
 	     ceph_cap_string(was | mask));
 	ci->i_dirty_caps |= mask;
 	if (was == 0) {
+		WARN_ON_ONCE(ci->i_prealloc_cap_flush);
+		swap(ci->i_prealloc_cap_flush, *pcf);
+
 		if (!ci->i_head_snapc) {
 			WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
 			ci->i_head_snapc = ceph_get_snap_context(
@@ -1391,6 +1395,8 @@
 			ihold(inode);
 			dirty |= I_DIRTY_SYNC;
 		}
+	} else {
+		WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
 	}
 	BUG_ON(list_empty(&ci->i_dirty_item));
 	if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
@@ -1446,6 +1452,17 @@
 	rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
 }
 
+struct ceph_cap_flush *ceph_alloc_cap_flush(void)
+{
+	return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+}
+
+void ceph_free_cap_flush(struct ceph_cap_flush *cf)
+{
+	if (cf)
+		kmem_cache_free(ceph_cap_flush_cachep, cf);
+}
+
 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
 {
 	struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
@@ -1469,11 +1486,12 @@
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_cap_flush *cf;
+	struct ceph_cap_flush *cf = NULL;
 	int flushing;
 
 	BUG_ON(ci->i_dirty_caps == 0);
 	BUG_ON(list_empty(&ci->i_dirty_item));
+	BUG_ON(!ci->i_prealloc_cap_flush);
 
 	flushing = ci->i_dirty_caps;
 	dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n",
@@ -1484,7 +1502,7 @@
 	ci->i_dirty_caps = 0;
 	dout(" inode %p now !dirty\n", inode);
 
-	cf = kmalloc(sizeof(*cf), GFP_ATOMIC);
+	swap(cf, ci->i_prealloc_cap_flush);
 	cf->caps = flushing;
 	cf->kick = false;
 
@@ -3075,7 +3093,7 @@
 		cf = list_first_entry(&to_remove,
 				      struct ceph_cap_flush, list);
 		list_del(&cf->list);
-		kfree(cf);
+		ceph_free_cap_flush(cf);
 	}
 	if (drop)
 		iput(inode);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0a76a37..8a4eb4d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -939,6 +939,7 @@
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_client *osdc =
 		&ceph_sb_to_client(inode->i_sb)->client->osdc;
+	struct ceph_cap_flush *prealloc_cf;
 	ssize_t count, written = 0;
 	int err, want, got;
 	loff_t pos;
@@ -946,6 +947,10 @@
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	mutex_lock(&inode->i_mutex);
 
 	/* We can write back this queue in page reclaim */
@@ -1050,7 +1055,8 @@
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1074,6 +1080,7 @@
 out:
 	mutex_unlock(&inode->i_mutex);
 out_unlocked:
+	ceph_free_cap_flush(prealloc_cf);
 	current->backing_dev_info = NULL;
 	return written ? written : err;
 }
@@ -1270,6 +1277,7 @@
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_client *osdc =
 		&ceph_inode_to_client(inode)->client->osdc;
+	struct ceph_cap_flush *prealloc_cf;
 	int want, got = 0;
 	int dirty;
 	int ret = 0;
@@ -1282,6 +1290,10 @@
 	if (!S_ISREG(inode->i_mode))
 		return -EOPNOTSUPP;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	mutex_lock(&inode->i_mutex);
 
 	if (ceph_snap(inode) != CEPH_NOSNAP) {
@@ -1328,7 +1340,8 @@
 	if (!ret) {
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1337,6 +1350,7 @@
 	ceph_put_cap_refs(ci, got);
 unlock:
 	mutex_unlock(&inode->i_mutex);
+	ceph_free_cap_flush(prealloc_cf);
 	return ret;
 }
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 3326302f..e86d1a4 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -416,6 +416,7 @@
 	ci->i_flushing_caps = 0;
 	INIT_LIST_HEAD(&ci->i_dirty_item);
 	INIT_LIST_HEAD(&ci->i_flushing_item);
+	ci->i_prealloc_cap_flush = NULL;
 	ci->i_cap_flush_tree = RB_ROOT;
 	init_waitqueue_head(&ci->i_cap_wq);
 	ci->i_hold_caps_min = 0;
@@ -1720,6 +1721,7 @@
 	const unsigned int ia_valid = attr->ia_valid;
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+	struct ceph_cap_flush *prealloc_cf;
 	int issued;
 	int release = 0, dirtied = 0;
 	int mask = 0;
@@ -1734,10 +1736,16 @@
 	if (err != 0)
 		return err;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR,
 				       USE_AUTH_MDS);
-	if (IS_ERR(req))
+	if (IS_ERR(req)) {
+		ceph_free_cap_flush(prealloc_cf);
 		return PTR_ERR(req);
+	}
 
 	spin_lock(&ci->i_ceph_lock);
 	issued = __ceph_caps_issued(ci, NULL);
@@ -1895,7 +1903,8 @@
 		dout("setattr %p ATTR_FILE ... hrm!\n", inode);
 
 	if (dirtied) {
-		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied);
+		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
+							   &prealloc_cf);
 		inode->i_ctime = CURRENT_TIME;
 	}
 
@@ -1927,9 +1936,11 @@
 	ceph_mdsc_put_request(req);
 	if (mask & CEPH_SETATTR_SIZE)
 		__ceph_do_pending_vmtruncate(inode);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 out_put:
 	ceph_mdsc_put_request(req);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 }
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 89e4305..8d73fe9 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1189,6 +1189,10 @@
 		}
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
+			list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+			ci->i_prealloc_cap_flush = NULL;
+		}
 	}
 	spin_unlock(&ci->i_ceph_lock);
 	while (!list_empty(&to_remove)) {
@@ -1196,7 +1200,7 @@
 		cf = list_first_entry(&to_remove,
 				      struct ceph_cap_flush, list);
 		list_del(&cf->list);
-		kfree(cf);
+		ceph_free_cap_flush(cf);
 	}
 	while (drop--)
 		iput(inode);
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index edeb83c..d1c833c 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -622,6 +622,7 @@
  */
 struct kmem_cache *ceph_inode_cachep;
 struct kmem_cache *ceph_cap_cachep;
+struct kmem_cache *ceph_cap_flush_cachep;
 struct kmem_cache *ceph_dentry_cachep;
 struct kmem_cache *ceph_file_cachep;
 
@@ -647,6 +648,10 @@
 				     SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
 	if (ceph_cap_cachep == NULL)
 		goto bad_cap;
+	ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush,
+					   SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
+	if (ceph_cap_flush_cachep == NULL)
+		goto bad_cap_flush;
 
 	ceph_dentry_cachep = KMEM_CACHE(ceph_dentry_info,
 					SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
@@ -665,6 +670,8 @@
 bad_file:
 	kmem_cache_destroy(ceph_dentry_cachep);
 bad_dentry:
+	kmem_cache_destroy(ceph_cap_flush_cachep);
+bad_cap_flush:
 	kmem_cache_destroy(ceph_cap_cachep);
 bad_cap:
 	kmem_cache_destroy(ceph_inode_cachep);
@@ -681,6 +688,7 @@
 
 	kmem_cache_destroy(ceph_inode_cachep);
 	kmem_cache_destroy(ceph_cap_cachep);
+	kmem_cache_destroy(ceph_cap_flush_cachep);
 	kmem_cache_destroy(ceph_dentry_cachep);
 	kmem_cache_destroy(ceph_file_cachep);
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index e7f13f7..4415e97 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -309,6 +309,7 @@
 	/* we need to track cap writeback on a per-cap-bit basis, to allow
 	 * overlapping, pipelined cap flushes to the mds.  we can probably
 	 * reduce the tid to 8 bits if we're concerned about inode size. */
+	struct ceph_cap_flush *i_prealloc_cap_flush;
 	struct rb_root i_cap_flush_tree;
 	wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
 	unsigned long i_hold_caps_min; /* jiffies */
@@ -578,7 +579,10 @@
 {
 	return ci->i_dirty_caps | ci->i_flushing_caps;
 }
-extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask);
+extern struct ceph_cap_flush *ceph_alloc_cap_flush(void);
+extern void ceph_free_cap_flush(struct ceph_cap_flush *cf);
+extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
+				  struct ceph_cap_flush **pcf);
 
 extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
 				      struct ceph_cap *ocap, int mask);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index c6f7d9b..819163d 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -912,6 +912,7 @@
 	struct ceph_vxattr *vxattr;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+	struct ceph_cap_flush *prealloc_cf = NULL;
 	int issued;
 	int err;
 	int dirty = 0;
@@ -950,6 +951,10 @@
 	if (!xattr)
 		goto out;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		goto out;
+
 	spin_lock(&ci->i_ceph_lock);
 retry:
 	issued = __ceph_caps_issued(ci, NULL);
@@ -991,7 +996,8 @@
 			  flags, value ? 1 : -1, &xattr);
 
 	if (!err) {
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
+					       &prealloc_cf);
 		ci->i_xattrs.dirty = true;
 		inode->i_ctime = CURRENT_TIME;
 	}
@@ -1001,6 +1007,7 @@
 		up_read(&mdsc->snap_rwsem);
 	if (dirty)
 		__mark_inode_dirty(inode, dirty);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 
 do_sync:
@@ -1010,6 +1017,7 @@
 		up_read(&mdsc->snap_rwsem);
 	err = ceph_sync_setxattr(dentry, name, value, size, flags);
 out:
+	ceph_free_cap_flush(prealloc_cf);
 	kfree(newname);
 	kfree(newval);
 	kfree(xattr);
@@ -1062,6 +1070,7 @@
 	struct ceph_vxattr *vxattr;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+	struct ceph_cap_flush *prealloc_cf = NULL;
 	int issued;
 	int err;
 	int required_blob_size;
@@ -1079,6 +1088,10 @@
 	if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
 		goto do_sync_unlocked;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	err = -ENOMEM;
 	spin_lock(&ci->i_ceph_lock);
 retry:
@@ -1120,7 +1133,8 @@
 
 	err = __remove_xattr_by_name(ceph_inode(inode), name);
 
-	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
+	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
+				       &prealloc_cf);
 	ci->i_xattrs.dirty = true;
 	inode->i_ctime = CURRENT_TIME;
 	spin_unlock(&ci->i_ceph_lock);
@@ -1128,12 +1142,14 @@
 		up_read(&mdsc->snap_rwsem);
 	if (dirty)
 		__mark_inode_dirty(inode, dirty);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
 do_sync_unlocked:
 	if (lock_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
+	ceph_free_cap_flush(prealloc_cf);
 	err = ceph_send_removexattr(dentry, name);
 	return err;
 }
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index d73a569..9ebee53 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -174,6 +174,7 @@
 
 extern struct kmem_cache *ceph_inode_cachep;
 extern struct kmem_cache *ceph_cap_cachep;
+extern struct kmem_cache *ceph_cap_flush_cachep;
 extern struct kmem_cache *ceph_dentry_cachep;
 extern struct kmem_cache *ceph_file_cachep;