libceph: support the RADOS copy-from operation

Add support for performing remote object copies using the 'copy-from'
operation.

[ Add COPY_FROM to get_num_data_items(). ]

Signed-off-by: Luis Henriques <lhenriques@suse.com>
Reviewed-by: Ilya Dryomov <idryomov@gmail.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index f3e8284..7a2af50 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -136,6 +136,13 @@ struct ceph_osd_req_op {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
+		struct {
+			u64 snapid;
+			u64 src_version;
+			u8 flags;
+			u32 src_fadvise_flags;
+			struct ceph_osd_data osd_data;
+		} copy_from;
 	};
 };
 
@@ -510,6 +517,16 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 				struct timespec64 *mtime,
 				struct page **pages, int nr_pages);
 
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+			u64 src_snapid, u64 src_version,
+			struct ceph_object_id *src_oid,
+			struct ceph_object_locator *src_oloc,
+			u32 src_fadvise_flags,
+			struct ceph_object_id *dst_oid,
+			struct ceph_object_locator *dst_oloc,
+			u32 dst_fadvise_flags,
+			u8 copy_from_flags);
+
 /* watch/notify */
 struct ceph_osd_linger_request *
 ceph_osdc_watch(struct ceph_osd_client *osdc,
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index f198838..3eb0e556 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -410,6 +410,14 @@ enum {
 enum {
 	CEPH_OSD_OP_FLAG_EXCL = 1,      /* EXCL object create */
 	CEPH_OSD_OP_FLAG_FAILOK = 2,    /* continue despite failure */
+	CEPH_OSD_OP_FLAG_FADVISE_RANDOM	    = 0x4, /* the op is random */
+	CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL = 0x8, /* the op is sequential */
+	CEPH_OSD_OP_FLAG_FADVISE_WILLNEED   = 0x10,/* data will be accessed in
+						      the near future */
+	CEPH_OSD_OP_FLAG_FADVISE_DONTNEED   = 0x20,/* data will not be accessed
+						      in the near future */
+	CEPH_OSD_OP_FLAG_FADVISE_NOCACHE    = 0x40,/* data will be accessed only
+						      once by this client */
 };
 
 #define EOLDSNAPC    ERESTART  /* ORDERSNAP flag set; writer has old snapc*/
@@ -432,6 +440,15 @@ enum {
 };
 
 enum {
+	CEPH_OSD_COPY_FROM_FLAG_FLUSH = 1,       /* part of a flush operation */
+	CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY = 2, /* ignore pool overlay */
+	CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE = 4,   /* ignore osd cache logic */
+	CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE = 8, /* map snap direct to
+						     * cloneid */
+	CEPH_OSD_COPY_FROM_FLAG_RWORDERED = 16,     /* order with write */
+};
+
+enum {
 	CEPH_OSD_WATCH_OP_UNWATCH = 0,
 	CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
 	/* note: use only ODD ids to prevent pre-giant code from
@@ -497,6 +514,17 @@ struct ceph_osd_op {
 			__le64 expected_object_size;
 			__le64 expected_write_size;
 		} __attribute__ ((packed)) alloc_hint;
+		struct {
+			__le64 snapid;
+			__le64 src_version;
+			__u8 flags; /* CEPH_OSD_COPY_FROM_FLAG_* */
+			/*
+			 * CEPH_OSD_OP_FLAG_FADVISE_*: fadvise flags
+			 * for src object, flags for dest object are in
+			 * ceph_osd_op::flags.
+			 */
+			__le32 src_fadvise_flags;
+		} __attribute__ ((packed)) copy_from;
 	};
 	__le32 payload_len;
 } __attribute__ ((packed));
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a0148de..d23a9f8 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -410,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_LIST_WATCHERS:
 		ceph_osd_data_release(&op->list_watchers.response_data);
 		break;
+	case CEPH_OSD_OP_COPY_FROM:
+		ceph_osd_data_release(&op->copy_from.osd_data);
+		break;
 	default:
 		break;
 	}
@@ -702,6 +705,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
 		case CEPH_OSD_OP_SETXATTR:
 		case CEPH_OSD_OP_CMPXATTR:
 		case CEPH_OSD_OP_NOTIFY_ACK:
+		case CEPH_OSD_OP_COPY_FROM:
 			*num_request_data_items += 1;
 			break;
 
@@ -1016,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 	case CEPH_OSD_OP_CREATE:
 	case CEPH_OSD_OP_DELETE:
 		break;
+	case CEPH_OSD_OP_COPY_FROM:
+		dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
+		dst->copy_from.src_version =
+			cpu_to_le64(src->copy_from.src_version);
+		dst->copy_from.flags = src->copy_from.flags;
+		dst->copy_from.src_fadvise_flags =
+			cpu_to_le32(src->copy_from.src_fadvise_flags);
+		break;
 	default:
 		pr_err("unsupported osd opcode %s\n",
 			ceph_osd_op_name(src->op));
@@ -1947,6 +1959,10 @@ static void setup_request_data(struct ceph_osd_request *req)
 			ceph_osdc_msg_data_add(request_msg,
 					       &op->notify_ack.request_data);
 			break;
+		case CEPH_OSD_OP_COPY_FROM:
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->copy_from.osd_data);
+			break;
 
 		/* reply */
 		case CEPH_OSD_OP_STAT:
@@ -5255,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
+static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
+				     u64 src_snapid, u64 src_version,
+				     struct ceph_object_id *src_oid,
+				     struct ceph_object_locator *src_oloc,
+				     u32 src_fadvise_flags,
+				     u32 dst_fadvise_flags,
+				     u8 copy_from_flags)
+{
+	struct ceph_osd_req_op *op;
+	struct page **pages;
+	void *p, *end;
+
+	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+	if (IS_ERR(pages))
+		return PTR_ERR(pages);
+
+	op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+	op->copy_from.snapid = src_snapid;
+	op->copy_from.src_version = src_version;
+	op->copy_from.flags = copy_from_flags;
+	op->copy_from.src_fadvise_flags = src_fadvise_flags;
+
+	p = page_address(pages[0]);
+	end = p + PAGE_SIZE;
+	ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
+	encode_oloc(&p, end, src_oloc);
+	op->indata_len = PAGE_SIZE - (end - p);
+
+	ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
+				 op->indata_len, 0, false, true);
+	return 0;
+}
+
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+			u64 src_snapid, u64 src_version,
+			struct ceph_object_id *src_oid,
+			struct ceph_object_locator *src_oloc,
+			u32 src_fadvise_flags,
+			struct ceph_object_id *dst_oid,
+			struct ceph_object_locator *dst_oloc,
+			u32 dst_fadvise_flags,
+			u8 copy_from_flags)
+{
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+	if (!req)
+		return -ENOMEM;
+
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+	ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
+	ceph_oid_copy(&req->r_t.base_oid, dst_oid);
+
+	ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
+					src_oloc, src_fadvise_flags,
+					dst_fadvise_flags, copy_from_flags);
+	if (ret)
+		goto out;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out;
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+
+out:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_copy_from);
+
 int __init ceph_osdc_setup(void)
 {
 	size_t size = sizeof(struct ceph_osd_request) +