libceph: enable large, variable-sized OSD requests

Turn r_ops into a flexible array member to enable large, consisting of
up to 16 ops, OSD requests.  The use case is scattered writeback in
cephfs and, as far as the kernel client is concerned, 16 is just a made
up number.

r_ops had size 3 for copyup+hint+write, but copyup is really a special
case - it can only happen once.  ceph_osd_request_cache is therefore
stuffed with num_ops=2 requests, anything bigger than that is allocated
with kmalloc().  req_mempool is backed by ceph_osd_request_cache, which
means either num_ops=1 or num_ops=2 for use_mempool=true - all existing
users (ceph_writepages_start(), ceph_osdc_writepages()) are fine with
that.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index f93d0e8..ccd4e03 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,9 +338,10 @@
 	ceph_put_snap_context(req->r_snapc);
 	if (req->r_mempool)
 		mempool_free(req, req->r_osdc->req_mempool);
-	else
+	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
 		kmem_cache_free(ceph_osd_request_cache, req);
-
+	else
+		kfree(req);
 }
 
 void ceph_osdc_get_request(struct ceph_osd_request *req)
@@ -369,18 +370,22 @@
 	struct ceph_msg *msg;
 	size_t msg_size;
 
-	BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
-	BUG_ON(num_ops > CEPH_OSD_MAX_OP);
-
 	if (use_mempool) {
+		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
-		memset(req, 0, sizeof(*req));
+	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
+		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
 	} else {
-		req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
+		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
+		req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
+			      gfp_flags);
 	}
-	if (req == NULL)
+	if (unlikely(!req))
 		return NULL;
 
+	/* req only, each op is zeroed in _osd_req_op_init() */
+	memset(req, 0, sizeof(*req));
+
 	req->r_osdc = osdc;
 	req->r_mempool = use_mempool;
 	req->r_num_ops = num_ops;
@@ -398,12 +403,19 @@
 	req->r_base_oloc.pool = -1;
 	req->r_target_oloc.pool = -1;
 
+	msg_size = OSD_OPREPLY_FRONT_LEN;
+	if (num_ops > CEPH_OSD_SLAB_OPS) {
+		/* ceph_osd_op and rval */
+		msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
+			    (sizeof(struct ceph_osd_op) + 4);
+	}
+
 	/* create reply message */
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-				   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
+		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
+				   gfp_flags, true);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
@@ -1811,7 +1823,7 @@
 
 	ceph_decode_need(&p, end, 4, bad_put);
 	numops = ceph_decode_32(&p);
-	if (numops > CEPH_OSD_MAX_OP)
+	if (numops > CEPH_OSD_MAX_OPS)
 		goto bad_put;
 	if (numops != req->r_num_ops)
 		goto bad_put;
@@ -2784,11 +2796,12 @@
 
 int ceph_osdc_setup(void)
 {
+	size_t size = sizeof(struct ceph_osd_request) +
+	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
+
 	BUG_ON(ceph_osd_request_cache);
-	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
-					sizeof (struct ceph_osd_request),
-					__alignof__(struct ceph_osd_request),
-					0, NULL);
+	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
+						   0, 0, NULL);
 
 	return ceph_osd_request_cache ? 0 : -ENOMEM;
 }