drbd: introduce WRITE_SAME support

We will support WRITE_SAME, if
 * all peers support WRITE_SAME (both in kernel and DRBD version),
 * all peer devices support WRITE_SAME
 * logical_block_size is identical on all peers.

We may at some point introduce a fallback on the receiving side
for devices/kernels that do not support WRITE_SAME,
by open-coding a submit loop. But not yet.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Signed-off-by: Jens Axboe <axboe@fb.com>
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 64e9525..f4ea8d6 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -920,6 +920,31 @@
 	}
 }
 
+/* communicated if (agreed_features & DRBD_FF_WSAME) */
+void assign_p_sizes_qlim(struct drbd_device *device, struct p_sizes *p, struct request_queue *q)
+{
+	if (q) {
+		p->qlim->physical_block_size = cpu_to_be32(queue_physical_block_size(q));
+		p->qlim->logical_block_size = cpu_to_be32(queue_logical_block_size(q));
+		p->qlim->alignment_offset = cpu_to_be32(queue_alignment_offset(q));
+		p->qlim->io_min = cpu_to_be32(queue_io_min(q));
+		p->qlim->io_opt = cpu_to_be32(queue_io_opt(q));
+		p->qlim->discard_enabled = blk_queue_discard(q);
+		p->qlim->discard_zeroes_data = queue_discard_zeroes_data(q);
+		p->qlim->write_same_capable = !!q->limits.max_write_same_sectors;
+	} else {
+		q = device->rq_queue;
+		p->qlim->physical_block_size = cpu_to_be32(queue_physical_block_size(q));
+		p->qlim->logical_block_size = cpu_to_be32(queue_logical_block_size(q));
+		p->qlim->alignment_offset = 0;
+		p->qlim->io_min = cpu_to_be32(queue_io_min(q));
+		p->qlim->io_opt = cpu_to_be32(queue_io_opt(q));
+		p->qlim->discard_enabled = 0;
+		p->qlim->discard_zeroes_data = 0;
+		p->qlim->write_same_capable = 0;
+	}
+}
+
 int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_reply, enum dds_flags flags)
 {
 	struct drbd_device *device = peer_device->device;
@@ -928,29 +953,37 @@
 	sector_t d_size, u_size;
 	int q_order_type;
 	unsigned int max_bio_size;
+	unsigned int packet_size;
 
+	sock = &peer_device->connection->data;
+	p = drbd_prepare_command(peer_device, sock);
+	if (!p)
+		return -EIO;
+
+	packet_size = sizeof(*p);
+	if (peer_device->connection->agreed_features & DRBD_FF_WSAME)
+		packet_size += sizeof(p->qlim[0]);
+
+	memset(p, 0, packet_size);
 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
-		D_ASSERT(device, device->ldev->backing_bdev);
+		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
 		d_size = drbd_get_max_capacity(device->ldev);
 		rcu_read_lock();
 		u_size = rcu_dereference(device->ldev->disk_conf)->disk_size;
 		rcu_read_unlock();
 		q_order_type = drbd_queue_order_type(device);
-		max_bio_size = queue_max_hw_sectors(device->ldev->backing_bdev->bd_disk->queue) << 9;
+		max_bio_size = queue_max_hw_sectors(q) << 9;
 		max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
+		assign_p_sizes_qlim(device, p, q);
 		put_ldev(device);
 	} else {
 		d_size = 0;
 		u_size = 0;
 		q_order_type = QUEUE_ORDERED_NONE;
 		max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
+		assign_p_sizes_qlim(device, p, NULL);
 	}
 
-	sock = &peer_device->connection->data;
-	p = drbd_prepare_command(peer_device, sock);
-	if (!p)
-		return -EIO;
-
 	if (peer_device->connection->agreed_pro_version <= 94)
 		max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
 	else if (peer_device->connection->agreed_pro_version < 100)
@@ -962,7 +995,8 @@
 	p->max_bio_size = cpu_to_be32(max_bio_size);
 	p->queue_order_type = cpu_to_be16(q_order_type);
 	p->dds_flags = cpu_to_be16(flags);
-	return drbd_send_command(peer_device, sock, P_SIZES, sizeof(*p), NULL, 0);
+
+	return drbd_send_command(peer_device, sock, P_SIZES, packet_size, NULL, 0);
 }
 
 /**
@@ -1577,6 +1611,9 @@
 					 ? 0 : MSG_MORE);
 		if (err)
 			return err;
+		/* REQ_OP_WRITE_SAME has only one segment */
+		if (bio_op(bio) == REQ_OP_WRITE_SAME)
+			break;
 	}
 	return 0;
 }
@@ -1595,6 +1632,9 @@
 				      bio_iter_last(bvec, iter) ? 0 : MSG_MORE);
 		if (err)
 			return err;
+		/* REQ_OP_WRITE_SAME has only one segment */
+		if (bio_op(bio) == REQ_OP_WRITE_SAME)
+			break;
 	}
 	return 0;
 }
@@ -1626,6 +1666,7 @@
 		return  (bio->bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
 			(bio->bi_rw & REQ_FUA ? DP_FUA : 0) |
 			(bio->bi_rw & REQ_PREFLUSH ? DP_FLUSH : 0) |
+			(bio_op(bio) == REQ_OP_WRITE_SAME ? DP_WSAME : 0) |
 			(bio_op(bio) == REQ_OP_DISCARD ? DP_DISCARD : 0);
 	else
 		return bio->bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
@@ -1639,6 +1680,8 @@
 	struct drbd_device *device = peer_device->device;
 	struct drbd_socket *sock;
 	struct p_data *p;
+	struct p_wsame *wsame = NULL;
+	void *digest_out;
 	unsigned int dp_flags = 0;
 	int digest_size;
 	int err;
@@ -1674,12 +1717,29 @@
 		err = __send_command(peer_device->connection, device->vnr, sock, P_TRIM, sizeof(*t), NULL, 0);
 		goto out;
 	}
+	if (dp_flags & DP_WSAME) {
+		/* this will only work if DRBD_FF_WSAME is set AND the
+		 * handshake agreed that all nodes and backend devices are
+		 * WRITE_SAME capable and agree on logical_block_size */
+		wsame = (struct p_wsame*)p;
+		digest_out = wsame + 1;
+		wsame->size = cpu_to_be32(req->i.size);
+	} else
+		digest_out = p + 1;
 
 	/* our digest is still only over the payload.
 	 * TRIM does not carry any payload. */
 	if (digest_size)
-		drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, p + 1);
-	err = __send_command(peer_device->connection, device->vnr, sock, P_DATA, sizeof(*p) + digest_size, NULL, req->i.size);
+		drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest_out);
+	if (wsame) {
+		err =
+		    __send_command(peer_device->connection, device->vnr, sock, P_WSAME,
+				   sizeof(*wsame) + digest_size, NULL,
+				   bio_iovec(req->master_bio).bv_len);
+	} else
+		err =
+		    __send_command(peer_device->connection, device->vnr, sock, P_DATA,
+				   sizeof(*p) + digest_size, NULL, req->i.size);
 	if (!err) {
 		/* For protocol A, we have to memcpy the payload into
 		 * socket buffers, as we may complete right away
@@ -3660,6 +3720,8 @@
 	 * one PRO_VERSION */
 	static const char *cmdnames[] = {
 		[P_DATA]	        = "Data",
+		[P_WSAME]	        = "WriteSame",
+		[P_TRIM]	        = "Trim",
 		[P_DATA_REPLY]	        = "DataReply",
 		[P_RS_DATA_REPLY]	= "RSDataReply",
 		[P_BARRIER]	        = "Barrier",