ceph: limit osd read size to CEPH_MSG_MAX_DATA_LEN

libceph returns -EIO when read size > CEPH_MSG_MAX_DATA_LEN.

Link: http://tracker.ceph.com/issues/20528
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 1bc709f..63ca173 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -455,13 +455,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 	if (rc == 0)
 		goto out;
 
-	if (fsc->mount_options->rsize >= PAGE_SIZE)
-		max = (fsc->mount_options->rsize + PAGE_SIZE - 1)
-			>> PAGE_SHIFT;
-
-	dout("readpages %p file %p nr_pages %d max %d\n", inode,
-		file, nr_pages,
-	     max);
+	max = fsc->mount_options->rsize >> PAGE_SHIFT;
+	dout("readpages %p file %p nr_pages %d max %d\n",
+	     inode, file, nr_pages, max);
 	while (!list_empty(page_list)) {
 		rc = start_read(inode, page_list, max);
 		if (rc < 0)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3d48c41..85f0dba 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -887,6 +887,9 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 			break;
 		}
 
+		if (!write)
+			size = min_t(u64, size, fsc->mount_options->rsize);
+
 		len = size;
 		pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
 		if (IS_ERR(pages)) {
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 280311e..2b2a260 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -236,7 +236,9 @@ static int parse_fsopt_token(char *c, void *private)
 		fsopt->wsize = intval;
 		break;
 	case Opt_rsize:
-		fsopt->rsize = intval;
+		if (intval < PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
+			return -EINVAL;
+		fsopt->rsize = ALIGN(intval, PAGE_SIZE);
 		break;
 	case Opt_rasize:
 		fsopt->rasize = intval;
@@ -390,7 +392,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
 	fsopt->sb_flags = flags;
 	fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
 
-	fsopt->rsize = CEPH_RSIZE_DEFAULT;
+	fsopt->rsize = CEPH_MAX_READ_SIZE;
 	fsopt->rasize = CEPH_RASIZE_DEFAULT;
 	fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
 	if (!fsopt->snapdir_name) {
@@ -505,7 +507,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
 	if (fsopt->wsize)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
-	if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
+	if (fsopt->rsize != CEPH_MAX_READ_SIZE)
 		seq_printf(m, ",rsize=%d", fsopt->rsize);
 	if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
 		seq_printf(m, ",rasize=%d", fsopt->rasize);
@@ -948,13 +950,8 @@ static int ceph_setup_bdi(struct super_block *sb, struct ceph_fs_client *fsc)
 	else
 		sb->s_bdi->ra_pages = VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
 
-	if (fsc->mount_options->rsize > fsc->mount_options->rasize &&
-	    fsc->mount_options->rsize >= PAGE_SIZE)
-		sb->s_bdi->io_pages =
-			(fsc->mount_options->rsize + PAGE_SIZE - 1)
-			>> PAGE_SHIFT;
-	else if (fsc->mount_options->rsize == 0)
-		sb->s_bdi->io_pages = ULONG_MAX;
+	/* set io_pages based on max osd read size */
+	sb->s_bdi->io_pages = fsc->mount_options->rsize >> PAGE_SHIFT;
 
 	return 0;
 }
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index da036d0..2b1b021 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -46,7 +46,8 @@
 #define ceph_test_mount_opt(fsc, opt) \
 	(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
 
-#define CEPH_RSIZE_DEFAULT              (64*1024*1024) /* max read size */
+/* max size of osd read request, limited by libceph */
+#define CEPH_MAX_READ_SIZE              CEPH_MSG_MAX_DATA_LEN
 #define CEPH_RASIZE_DEFAULT             (8192*1024)    /* max readahead */
 #define CEPH_MAX_READDIR_DEFAULT        1024
 #define CEPH_MAX_READDIR_BYTES_DEFAULT  (512*1024)