ceph: sync read inline data

we can't use getattr to fetch inline data while holding Fr cap,
because it can cause deadlock. If we need to sync read inline data,
drop cap refs first, then use getattr to fetch inline data.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5d2b88e..13413d7 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -192,17 +192,30 @@
 	struct ceph_osd_client *osdc =
 		&ceph_inode_to_client(inode)->client->osdc;
 	int err = 0;
+	u64 off = page_offset(page);
 	u64 len = PAGE_CACHE_SIZE;
 
-	err = ceph_readpage_from_fscache(inode, page);
+	if (off >= i_size_read(inode)) {
+		zero_user_segment(page, err, PAGE_CACHE_SIZE);
+		SetPageUptodate(page);
+		return 0;
+	}
 
+	/*
+	 * Uptodate inline data should have been added into page cache
+	 * while getting Fcr caps.
+	 */
+	if (ci->i_inline_version != CEPH_INLINE_NONE)
+		return -EINVAL;
+
+	err = ceph_readpage_from_fscache(inode, page);
 	if (err == 0)
 		goto out;
 
 	dout("readpage inode %p file %p page %p index %lu\n",
 	     inode, filp, page, page->index);
 	err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-				  (u64) page_offset(page), &len,
+				  off, &len,
 				  ci->i_truncate_seq, ci->i_truncate_size,
 				  &page, 1, 0);
 	if (err == -ENOENT)
@@ -384,6 +397,9 @@
 	int rc = 0;
 	int max = 0;
 
+	if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
+		return -EINVAL;
+
 	rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
 					 &nr_pages);
 
@@ -1219,8 +1235,8 @@
 		want = CEPH_CAP_FILE_CACHE;
 	while (1) {
 		got = 0;
-		ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1,
-				    &got, &pinned_page);
+		ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
+				    -1, &got, &pinned_page);
 		if (ret == 0)
 			break;
 		if (ret != -ERESTARTSYS) {
@@ -1231,7 +1247,11 @@
 	dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
 	     inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
 
-	ret = filemap_fault(vma, vmf);
+	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
+	    ci->i_inline_version == CEPH_INLINE_NONE)
+		ret = filemap_fault(vma, vmf);
+	else
+		ret = -EAGAIN;
 
 	dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
 	     inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
@@ -1239,6 +1259,42 @@
 		page_cache_release(pinned_page);
 	ceph_put_cap_refs(ci, got);
 
+	if (ret != -EAGAIN)
+		return ret;
+
+	/* read inline data */
+	if (off >= PAGE_CACHE_SIZE) {
+		/* does not support inline data > PAGE_SIZE */
+		ret = VM_FAULT_SIGBUS;
+	} else {
+		int ret1;
+		struct address_space *mapping = inode->i_mapping;
+		struct page *page = find_or_create_page(mapping, 0,
+						mapping_gfp_mask(mapping) &
+						~__GFP_FS);
+		if (!page) {
+			ret = VM_FAULT_OOM;
+			goto out;
+		}
+		ret1 = __ceph_do_getattr(inode, page,
+					 CEPH_STAT_CAP_INLINE_DATA, true);
+		if (ret1 < 0 || off >= i_size_read(inode)) {
+			unlock_page(page);
+			page_cache_release(page);
+			ret = VM_FAULT_SIGBUS;
+			goto out;
+		}
+		if (ret1 < PAGE_CACHE_SIZE)
+			zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
+		else
+			flush_dcache_page(page);
+		SetPageUptodate(page);
+		vmf->page = page;
+		ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
+	}
+out:
+	dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
+	     inode, off, (size_t)PAGE_CACHE_SIZE, ret);
 	return ret;
 }
 
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 861b995..5b092bd 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -333,6 +333,11 @@
 	return 0;
 }
 
+enum {
+	CHECK_EOF = 1,
+	READ_INLINE = 2,
+};
+
 /*
  * Read a range of bytes striped over one or more objects.  Iterate over
  * objects we stripe over.  (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@
 		ret = read;
 		/* did we bounce off eof? */
 		if (pos + left > inode->i_size)
-			*checkeof = 1;
+			*checkeof = CHECK_EOF;
 	}
 
 	dout("striped_read returns %d\n", ret);
@@ -808,7 +813,7 @@
 	struct page *pinned_page = NULL;
 	ssize_t ret;
 	int want, got = 0;
-	int checkeof = 0, read = 0;
+	int retry_op = 0, read = 0;
 
 again:
 	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -830,8 +835,12 @@
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
 		     ceph_cap_string(got));
 
-		/* hmm, this isn't really async... */
-		ret = ceph_sync_read(iocb, to, &checkeof);
+		if (ci->i_inline_version == CEPH_INLINE_NONE) {
+			/* hmm, this isn't really async... */
+			ret = ceph_sync_read(iocb, to, &retry_op);
+		} else {
+			retry_op = READ_INLINE;
+		}
 	} else {
 		dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -846,12 +855,50 @@
 		pinned_page = NULL;
 	}
 	ceph_put_cap_refs(ci, got);
+	if (retry_op && ret >= 0) {
+		int statret;
+		struct page *page = NULL;
+		loff_t i_size;
+		if (retry_op == READ_INLINE) {
+			page = __page_cache_alloc(GFP_NOFS);
+			if (!page)
+				return -ENOMEM;
+		}
 
-	if (checkeof && ret >= 0) {
-		int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
+		statret = __ceph_do_getattr(inode, page,
+					    CEPH_STAT_CAP_INLINE_DATA, !!page);
+		if (statret < 0) {
+			 __free_page(page);
+			if (statret == -ENODATA) {
+				BUG_ON(retry_op != READ_INLINE);
+				goto again;
+			}
+			return statret;
+		}
+
+		i_size = i_size_read(inode);
+		if (retry_op == READ_INLINE) {
+			/* does not support inline data > PAGE_SIZE */
+			if (i_size > PAGE_CACHE_SIZE) {
+				ret = -EIO;
+			} else if (iocb->ki_pos < i_size) {
+				loff_t end = min_t(loff_t, i_size,
+						   iocb->ki_pos + len);
+				if (statret < end)
+					zero_user_segment(page, statret, end);
+				ret = copy_page_to_iter(page,
+						iocb->ki_pos & ~PAGE_MASK,
+						end - iocb->ki_pos, to);
+				iocb->ki_pos += ret;
+			} else {
+				ret = 0;
+			}
+			__free_pages(page, 0);
+			return ret;
+		}
 
 		/* hit EOF or hole? */
-		if (statret == 0 && iocb->ki_pos < inode->i_size &&
+		if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
 			ret < len) {
 			dout("sync_read hit hole, ppos %lld < size %lld"
 			     ", reading more\n", iocb->ki_pos,
@@ -859,7 +906,7 @@
 
 			read += ret;
 			len -= ret;
-			checkeof = 0;
+			retry_op = 0;
 			goto again;
 		}
 	}