blob: c42f3058abc79de903b979b2a1fe9ea6dbc74ed0 [file] [log] [blame]
Yehuda Sadeh602adf42010-08-12 16:11:25 -07001/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 Instructions for use
25 --------------------
26
27 1) Map a Linux block device to an existing rbd image.
28
29 Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
30
31 $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
32
33 The snapshot name can be "-" or omitted to map the image read/write.
34
35 2) List all active blkdev<->object mappings.
36
37 In this example, we have performed step #1 twice, creating two blkdevs,
38 mapped to two separate rados objects in the rados rbd pool
39
40 $ cat /sys/class/rbd/list
41 #id major client_name pool name snap KB
42 0 254 client4143 rbd foo - 1024000
43
44 The columns, in order, are:
45 - blkdev unique id
46 - blkdev assigned major
47 - rados client id
48 - rados pool name
49 - rados block device name
50 - mapped snapshot ("-" if none)
51 - device size in KB
52
53
54 3) Create a snapshot.
55
56 Usage: <blkdev id> <snapname>
57
58 $ echo "0 mysnap" > /sys/class/rbd/snap_create
59
60
61 4) Listing a snapshot.
62
63 $ cat /sys/class/rbd/snaps_list
64 #id snap KB
65 0 - 1024000 (*)
66 0 foo 1024000
67
68 The columns, in order, are:
69 - blkdev unique id
70 - snapshot name, '-' means none (active read/write version)
71 - size of device at time of snapshot
72 - the (*) indicates this is the active version
73
74 5) Rollback to snapshot.
75
76 Usage: <blkdev id> <snapname>
77
78 $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
79
80
81 6) Mapping an image using snapshot.
82
83 A snapshot mapping is read-only. This is being done by passing
84 snap=<snapname> to the options when adding a device.
85
86 $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
87
88
89 7) Remove an active blkdev<->rbd image mapping.
90
91 In this example, we remove the mapping with blkdev unique id 1.
92
93 $ echo 1 > /sys/class/rbd/remove
94
95
96 NOTE: The actual creation and deletion of rados objects is outside the scope
97 of this driver.
98
99 */
100
101#include <linux/ceph/libceph.h>
102#include <linux/ceph/osd_client.h>
103#include <linux/ceph/mon_client.h>
104#include <linux/ceph/decode.h>
105
106#include <linux/kernel.h>
107#include <linux/device.h>
108#include <linux/module.h>
109#include <linux/fs.h>
110#include <linux/blkdev.h>
111
112#include "rbd_types.h"
113
114#define DRV_NAME "rbd"
115#define DRV_NAME_LONG "rbd (rados block device)"
116
117#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
118
119#define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
120#define RBD_MAX_POOL_NAME_LEN 64
121#define RBD_MAX_SNAP_NAME_LEN 32
122#define RBD_MAX_OPT_LEN 1024
123
124#define RBD_SNAP_HEAD_NAME "-"
125
126#define DEV_NAME_LEN 32
127
128/*
129 * block device image metadata (in-memory version)
130 */
131struct rbd_image_header {
132 u64 image_size;
133 char block_name[32];
134 __u8 obj_order;
135 __u8 crypt_type;
136 __u8 comp_type;
137 struct rw_semaphore snap_rwsem;
138 struct ceph_snap_context *snapc;
139 size_t snap_names_len;
140 u64 snap_seq;
141 u32 total_snaps;
142
143 char *snap_names;
144 u64 *snap_sizes;
145};
146
147/*
148 * an instance of the client. multiple devices may share a client.
149 */
150struct rbd_client {
151 struct ceph_client *client;
152 struct kref kref;
153 struct list_head node;
154};
155
156/*
157 * a single io request
158 */
159struct rbd_request {
160 struct request *rq; /* blk layer request */
161 struct bio *bio; /* cloned bio */
162 struct page **pages; /* list of used pages */
163 u64 len;
164};
165
166/*
167 * a single device
168 */
169struct rbd_device {
170 int id; /* blkdev unique id */
171
172 int major; /* blkdev assigned major */
173 struct gendisk *disk; /* blkdev's gendisk and rq */
174 struct request_queue *q;
175
176 struct ceph_client *client;
177 struct rbd_client *rbd_client;
178
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181 spinlock_t lock; /* queue lock */
182
183 struct rbd_image_header header;
184 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
185 int obj_len;
186 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187 char pool_name[RBD_MAX_POOL_NAME_LEN];
188 int poolid;
189
190 char snap_name[RBD_MAX_SNAP_NAME_LEN];
191 u32 cur_snap; /* index+1 of current snapshot within snap context
192 0 - for the head */
193 int read_only;
194
195 struct list_head node;
196};
197
198static spinlock_t node_lock; /* protects client get/put */
199
200static struct class *class_rbd; /* /sys/class/rbd */
201static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
202static LIST_HEAD(rbd_dev_list); /* devices */
203static LIST_HEAD(rbd_client_list); /* clients */
204
205
206static int rbd_open(struct block_device *bdev, fmode_t mode)
207{
208 struct gendisk *disk = bdev->bd_disk;
209 struct rbd_device *rbd_dev = disk->private_data;
210
211 set_device_ro(bdev, rbd_dev->read_only);
212
213 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
214 return -EROFS;
215
216 return 0;
217}
218
219static const struct block_device_operations rbd_bd_ops = {
220 .owner = THIS_MODULE,
221 .open = rbd_open,
222};
223
224/*
225 * Initialize an rbd client instance.
226 * We own *opt.
227 */
228static struct rbd_client *rbd_client_create(struct ceph_options *opt)
229{
230 struct rbd_client *rbdc;
231 int ret = -ENOMEM;
232
233 dout("rbd_client_create\n");
234 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
235 if (!rbdc)
236 goto out_opt;
237
238 kref_init(&rbdc->kref);
239 INIT_LIST_HEAD(&rbdc->node);
240
241 rbdc->client = ceph_create_client(opt, rbdc);
242 if (IS_ERR(rbdc->client))
243 goto out_rbdc;
244
245 ret = ceph_open_session(rbdc->client);
246 if (ret < 0)
247 goto out_err;
248
249 spin_lock(&node_lock);
250 list_add_tail(&rbdc->node, &rbd_client_list);
251 spin_unlock(&node_lock);
252
253 dout("rbd_client_create created %p\n", rbdc);
254 return rbdc;
255
256out_err:
257 ceph_destroy_client(rbdc->client);
258 return ERR_PTR(ret);
259
260out_rbdc:
261 kfree(rbdc);
262out_opt:
263 ceph_destroy_options(opt);
264 return ERR_PTR(-ENOMEM);
265}
266
267/*
268 * Find a ceph client with specific addr and configuration.
269 */
270static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
271{
272 struct rbd_client *client_node;
273
274 if (opt->flags & CEPH_OPT_NOSHARE)
275 return NULL;
276
277 list_for_each_entry(client_node, &rbd_client_list, node)
278 if (ceph_compare_options(opt, client_node->client) == 0)
279 return client_node;
280 return NULL;
281}
282
283/*
284 * Get a ceph client with specific addr and configuration, if one does
285 * not exist create it.
286 */
287static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
288 char *options)
289{
290 struct rbd_client *rbdc;
291 struct ceph_options *opt;
292 int ret;
293
294 ret = ceph_parse_options(&opt, options, mon_addr,
295 mon_addr + strlen(mon_addr), NULL, NULL);
296 if (ret < 0)
297 return ret;
298
299 spin_lock(&node_lock);
300 rbdc = __rbd_client_find(opt);
301 if (rbdc) {
302 ceph_destroy_options(opt);
303
304 /* using an existing client */
305 kref_get(&rbdc->kref);
306 rbd_dev->rbd_client = rbdc;
307 rbd_dev->client = rbdc->client;
308 spin_unlock(&node_lock);
309 return 0;
310 }
311 spin_unlock(&node_lock);
312
313 rbdc = rbd_client_create(opt);
314 if (IS_ERR(rbdc))
315 return PTR_ERR(rbdc);
316
317 rbd_dev->rbd_client = rbdc;
318 rbd_dev->client = rbdc->client;
319 return 0;
320}
321
322/*
323 * Destroy ceph client
324 */
325static void rbd_client_release(struct kref *kref)
326{
327 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
328
329 dout("rbd_release_client %p\n", rbdc);
330 spin_lock(&node_lock);
331 list_del(&rbdc->node);
332 spin_unlock(&node_lock);
333
334 ceph_destroy_client(rbdc->client);
335 kfree(rbdc);
336}
337
338/*
339 * Drop reference to ceph client node. If it's not referenced anymore, release
340 * it.
341 */
342static void rbd_put_client(struct rbd_device *rbd_dev)
343{
344 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345 rbd_dev->rbd_client = NULL;
346 rbd_dev->client = NULL;
347}
348
349
350/*
351 * Create a new header structure, translate header format from the on-disk
352 * header.
353 */
354static int rbd_header_from_disk(struct rbd_image_header *header,
355 struct rbd_image_header_ondisk *ondisk,
356 int allocated_snaps,
357 gfp_t gfp_flags)
358{
359 int i;
360 u32 snap_count = le32_to_cpu(ondisk->snap_count);
361 int ret = -ENOMEM;
362
363 init_rwsem(&header->snap_rwsem);
364
365 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
367 snap_count *
368 sizeof(struct rbd_image_snap_ondisk),
369 gfp_flags);
370 if (!header->snapc)
371 return -ENOMEM;
372 if (snap_count) {
373 header->snap_names = kmalloc(header->snap_names_len,
374 GFP_KERNEL);
375 if (!header->snap_names)
376 goto err_snapc;
377 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
378 GFP_KERNEL);
379 if (!header->snap_sizes)
380 goto err_names;
381 } else {
382 header->snap_names = NULL;
383 header->snap_sizes = NULL;
384 }
385 memcpy(header->block_name, ondisk->block_name,
386 sizeof(ondisk->block_name));
387
388 header->image_size = le64_to_cpu(ondisk->image_size);
389 header->obj_order = ondisk->options.order;
390 header->crypt_type = ondisk->options.crypt_type;
391 header->comp_type = ondisk->options.comp_type;
392
393 atomic_set(&header->snapc->nref, 1);
394 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395 header->snapc->num_snaps = snap_count;
396 header->total_snaps = snap_count;
397
398 if (snap_count &&
399 allocated_snaps == snap_count) {
400 for (i = 0; i < snap_count; i++) {
401 header->snapc->snaps[i] =
402 le64_to_cpu(ondisk->snaps[i].id);
403 header->snap_sizes[i] =
404 le64_to_cpu(ondisk->snaps[i].image_size);
405 }
406
407 /* copy snapshot names */
408 memcpy(header->snap_names, &ondisk->snaps[i],
409 header->snap_names_len);
410 }
411
412 return 0;
413
414err_names:
415 kfree(header->snap_names);
416err_snapc:
417 kfree(header->snapc);
418 return ret;
419}
420
421static int snap_index(struct rbd_image_header *header, int snap_num)
422{
423 return header->total_snaps - snap_num;
424}
425
426static u64 cur_snap_id(struct rbd_device *rbd_dev)
427{
428 struct rbd_image_header *header = &rbd_dev->header;
429
430 if (!rbd_dev->cur_snap)
431 return 0;
432
433 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
434}
435
436static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
437 u64 *seq, u64 *size)
438{
439 int i;
440 char *p = header->snap_names;
441
442 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443 if (strcmp(snap_name, p) == 0)
444 break;
445 }
446 if (i == header->total_snaps)
447 return -ENOENT;
448 if (seq)
449 *seq = header->snapc->snaps[i];
450
451 if (size)
452 *size = header->snap_sizes[i];
453
454 return i;
455}
456
457static int rbd_header_set_snap(struct rbd_device *dev,
458 const char *snap_name,
459 u64 *size)
460{
461 struct rbd_image_header *header = &dev->header;
462 struct ceph_snap_context *snapc = header->snapc;
463 int ret = -ENOENT;
464
465 down_write(&header->snap_rwsem);
466
467 if (!snap_name ||
468 !*snap_name ||
469 strcmp(snap_name, "-") == 0 ||
470 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471 if (header->total_snaps)
472 snapc->seq = header->snap_seq;
473 else
474 snapc->seq = 0;
475 dev->cur_snap = 0;
476 dev->read_only = 0;
477 if (size)
478 *size = header->image_size;
479 } else {
480 ret = snap_by_name(header, snap_name, &snapc->seq, size);
481 if (ret < 0)
482 goto done;
483
484 dev->cur_snap = header->total_snaps - ret;
485 dev->read_only = 1;
486 }
487
488 ret = 0;
489done:
490 up_write(&header->snap_rwsem);
491 return ret;
492}
493
494static void rbd_header_free(struct rbd_image_header *header)
495{
496 kfree(header->snapc);
497 kfree(header->snap_names);
498 kfree(header->snap_sizes);
499}
500
501/*
502 * get the actual striped segment name, offset and length
503 */
504static u64 rbd_get_segment(struct rbd_image_header *header,
505 const char *block_name,
506 u64 ofs, u64 len,
507 char *seg_name, u64 *segofs)
508{
509 u64 seg = ofs >> header->obj_order;
510
511 if (seg_name)
512 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513 "%s.%012llx", block_name, seg);
514
515 ofs = ofs & ((1 << header->obj_order) - 1);
516 len = min_t(u64, len, (1 << header->obj_order) - ofs);
517
518 if (segofs)
519 *segofs = ofs;
520
521 return len;
522}
523
524/*
525 * bio helpers
526 */
527
528static void bio_chain_put(struct bio *chain)
529{
530 struct bio *tmp;
531
532 while (chain) {
533 tmp = chain;
534 chain = chain->bi_next;
535 bio_put(tmp);
536 }
537}
538
539/*
540 * zeros a bio chain, starting at specific offset
541 */
542static void zero_bio_chain(struct bio *chain, int start_ofs)
543{
544 struct bio_vec *bv;
545 unsigned long flags;
546 void *buf;
547 int i;
548 int pos = 0;
549
550 while (chain) {
551 bio_for_each_segment(bv, chain, i) {
552 if (pos + bv->bv_len > start_ofs) {
553 int remainder = max(start_ofs - pos, 0);
554 buf = bvec_kmap_irq(bv, &flags);
555 memset(buf + remainder, 0,
556 bv->bv_len - remainder);
557 bvec_kunmap_irq(bv, &flags);
558 }
559 pos += bv->bv_len;
560 }
561
562 chain = chain->bi_next;
563 }
564}
565
566/*
567 * bio_chain_clone - clone a chain of bios up to a certain length.
568 * might return a bio_pair that will need to be released.
569 */
570static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571 struct bio_pair **bp,
572 int len, gfp_t gfpmask)
573{
574 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
575 int total = 0;
576
577 if (*bp) {
578 bio_pair_release(*bp);
579 *bp = NULL;
580 }
581
582 while (old_chain && (total < len)) {
583 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
584 if (!tmp)
585 goto err_out;
586
587 if (total + old_chain->bi_size > len) {
588 struct bio_pair *bp;
589
590 /*
591 * this split can only happen with a single paged bio,
592 * split_bio will BUG_ON if this is not the case
593 */
594 dout("bio_chain_clone split! total=%d remaining=%d"
595 "bi_size=%d\n",
596 (int)total, (int)len-total,
597 (int)old_chain->bi_size);
598
599 /* split the bio. We'll release it either in the next
600 call, or it will have to be released outside */
601 bp = bio_split(old_chain, (len - total) / 512ULL);
602 if (!bp)
603 goto err_out;
604
605 __bio_clone(tmp, &bp->bio1);
606
607 *next = &bp->bio2;
608 } else {
609 __bio_clone(tmp, old_chain);
610 *next = old_chain->bi_next;
611 }
612
613 tmp->bi_bdev = NULL;
614 gfpmask &= ~__GFP_WAIT;
615 tmp->bi_next = NULL;
616
617 if (!new_chain) {
618 new_chain = tail = tmp;
619 } else {
620 tail->bi_next = tmp;
621 tail = tmp;
622 }
623 old_chain = old_chain->bi_next;
624
625 total += tmp->bi_size;
626 }
627
628 BUG_ON(total < len);
629
630 if (tail)
631 tail->bi_next = NULL;
632
633 *old = old_chain;
634
635 return new_chain;
636
637err_out:
638 dout("bio_chain_clone with err\n");
639 bio_chain_put(new_chain);
640 return NULL;
641}
642
643/*
644 * helpers for osd request op vectors.
645 */
646static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
647 int num_ops,
648 int opcode,
649 u32 payload_len)
650{
651 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
652 GFP_NOIO);
653 if (!*ops)
654 return -ENOMEM;
655 (*ops)[0].op = opcode;
656 /*
657 * op extent offset and length will be set later on
658 * in calc_raw_layout()
659 */
660 (*ops)[0].payload_len = payload_len;
661 return 0;
662}
663
664static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
665{
666 kfree(ops);
667}
668
669/*
670 * Send ceph osd request
671 */
672static int rbd_do_request(struct request *rq,
673 struct rbd_device *dev,
674 struct ceph_snap_context *snapc,
675 u64 snapid,
676 const char *obj, u64 ofs, u64 len,
677 struct bio *bio,
678 struct page **pages,
679 int num_pages,
680 int flags,
681 struct ceph_osd_req_op *ops,
682 int num_reply,
683 void (*rbd_cb)(struct ceph_osd_request *req,
684 struct ceph_msg *msg))
685{
686 struct ceph_osd_request *req;
687 struct ceph_file_layout *layout;
688 int ret;
689 u64 bno;
690 struct timespec mtime = CURRENT_TIME;
691 struct rbd_request *req_data;
692 struct ceph_osd_request_head *reqhead;
693 struct rbd_image_header *header = &dev->header;
694
695 ret = -ENOMEM;
696 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
697 if (!req_data)
698 goto done;
699
700 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
701
702 down_read(&header->snap_rwsem);
703
704 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
705 snapc,
706 ops,
707 false,
708 GFP_NOIO, pages, bio);
709 if (IS_ERR(req)) {
710 up_read(&header->snap_rwsem);
711 ret = PTR_ERR(req);
712 goto done_pages;
713 }
714
715 req->r_callback = rbd_cb;
716
717 req_data->rq = rq;
718 req_data->bio = bio;
719 req_data->pages = pages;
720 req_data->len = len;
721
722 req->r_priv = req_data;
723
724 reqhead = req->r_request->front.iov_base;
725 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
726
727 strncpy(req->r_oid, obj, sizeof(req->r_oid));
728 req->r_oid_len = strlen(req->r_oid);
729
730 layout = &req->r_file_layout;
731 memset(layout, 0, sizeof(*layout));
732 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733 layout->fl_stripe_count = cpu_to_le32(1);
734 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735 layout->fl_pg_preferred = cpu_to_le32(-1);
736 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738 ofs, &len, &bno, req, ops);
739
740 ceph_osdc_build_request(req, ofs, &len,
741 ops,
742 snapc,
743 &mtime,
744 req->r_oid, req->r_oid_len);
745 up_read(&header->snap_rwsem);
746
747 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
748 if (ret < 0)
749 goto done_err;
750
751 if (!rbd_cb) {
752 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753 ceph_osdc_put_request(req);
754 }
755 return ret;
756
757done_err:
758 bio_chain_put(req_data->bio);
759 ceph_osdc_put_request(req);
760done_pages:
761 kfree(req_data);
762done:
763 if (rq)
764 blk_end_request(rq, ret, len);
765 return ret;
766}
767
768/*
769 * Ceph osd op callback
770 */
771static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
772{
773 struct rbd_request *req_data = req->r_priv;
774 struct ceph_osd_reply_head *replyhead;
775 struct ceph_osd_op *op;
776 __s32 rc;
777 u64 bytes;
778 int read_op;
779
780 /* parse reply */
781 replyhead = msg->front.iov_base;
782 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783 op = (void *)(replyhead + 1);
784 rc = le32_to_cpu(replyhead->result);
785 bytes = le64_to_cpu(op->extent.length);
786 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
787
788 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
789
790 if (rc == -ENOENT && read_op) {
791 zero_bio_chain(req_data->bio, 0);
792 rc = 0;
793 } else if (rc == 0 && read_op && bytes < req_data->len) {
794 zero_bio_chain(req_data->bio, bytes);
795 bytes = req_data->len;
796 }
797
798 blk_end_request(req_data->rq, rc, bytes);
799
800 if (req_data->bio)
801 bio_chain_put(req_data->bio);
802
803 ceph_osdc_put_request(req);
804 kfree(req_data);
805}
806
807/*
808 * Do a synchronous ceph osd operation
809 */
810static int rbd_req_sync_op(struct rbd_device *dev,
811 struct ceph_snap_context *snapc,
812 u64 snapid,
813 int opcode,
814 int flags,
815 struct ceph_osd_req_op *orig_ops,
816 int num_reply,
817 const char *obj,
818 u64 ofs, u64 len,
819 char *buf)
820{
821 int ret;
822 struct page **pages;
823 int num_pages;
824 struct ceph_osd_req_op *ops = orig_ops;
825 u32 payload_len;
826
827 num_pages = calc_pages_for(ofs , len);
828 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
829 if (!pages)
830 return -ENOMEM;
831
832 if (!orig_ops) {
833 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
835 if (ret < 0)
836 goto done;
837
838 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
840 if (ret < 0)
841 goto done_ops;
842 }
843 }
844
845 ret = rbd_do_request(NULL, dev, snapc, snapid,
846 obj, ofs, len, NULL,
847 pages, num_pages,
848 flags,
849 ops,
850 2,
851 NULL);
852 if (ret < 0)
853 goto done_ops;
854
855 if ((flags & CEPH_OSD_FLAG_READ) && buf)
856 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
857
858done_ops:
859 if (!orig_ops)
860 rbd_destroy_ops(ops);
861done:
862 ceph_release_page_vector(pages, num_pages);
863 return ret;
864}
865
866/*
867 * Do an asynchronous ceph osd operation
868 */
869static int rbd_do_op(struct request *rq,
870 struct rbd_device *rbd_dev ,
871 struct ceph_snap_context *snapc,
872 u64 snapid,
873 int opcode, int flags, int num_reply,
874 u64 ofs, u64 len,
875 struct bio *bio)
876{
877 char *seg_name;
878 u64 seg_ofs;
879 u64 seg_len;
880 int ret;
881 struct ceph_osd_req_op *ops;
882 u32 payload_len;
883
884 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
885 if (!seg_name)
886 return -ENOMEM;
887
888 seg_len = rbd_get_segment(&rbd_dev->header,
889 rbd_dev->header.block_name,
890 ofs, len,
891 seg_name, &seg_ofs);
892 if (seg_len < 0)
893 return seg_len;
894
895 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
896
897 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
898 if (ret < 0)
899 goto done;
900
901 /* we've taken care of segment sizes earlier when we
902 cloned the bios. We should never have a segment
903 truncated at this point */
904 BUG_ON(seg_len < len);
905
906 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
907 seg_name, seg_ofs, seg_len,
908 bio,
909 NULL, 0,
910 flags,
911 ops,
912 num_reply,
913 rbd_req_cb);
914done:
915 kfree(seg_name);
916 return ret;
917}
918
919/*
920 * Request async osd write
921 */
922static int rbd_req_write(struct request *rq,
923 struct rbd_device *rbd_dev,
924 struct ceph_snap_context *snapc,
925 u64 ofs, u64 len,
926 struct bio *bio)
927{
928 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
929 CEPH_OSD_OP_WRITE,
930 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
931 2,
932 ofs, len, bio);
933}
934
935/*
936 * Request async osd read
937 */
938static int rbd_req_read(struct request *rq,
939 struct rbd_device *rbd_dev,
940 u64 snapid,
941 u64 ofs, u64 len,
942 struct bio *bio)
943{
944 return rbd_do_op(rq, rbd_dev, NULL,
945 (snapid ? snapid : CEPH_NOSNAP),
946 CEPH_OSD_OP_READ,
947 CEPH_OSD_FLAG_READ,
948 2,
949 ofs, len, bio);
950}
951
952/*
953 * Request sync osd read
954 */
955static int rbd_req_sync_read(struct rbd_device *dev,
956 struct ceph_snap_context *snapc,
957 u64 snapid,
958 const char *obj,
959 u64 ofs, u64 len,
960 char *buf)
961{
962 return rbd_req_sync_op(dev, NULL,
963 (snapid ? snapid : CEPH_NOSNAP),
964 CEPH_OSD_OP_READ,
965 CEPH_OSD_FLAG_READ,
966 NULL,
967 1, obj, ofs, len, buf);
968}
969
970/*
971 * Request sync osd read
972 */
973static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
974 u64 snapid,
975 const char *obj)
976{
977 struct ceph_osd_req_op *ops;
978 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
979 if (ret < 0)
980 return ret;
981
982 ops[0].snap.snapid = snapid;
983
984 ret = rbd_req_sync_op(dev, NULL,
985 CEPH_NOSNAP,
986 0,
987 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
988 ops,
989 1, obj, 0, 0, NULL);
990
991 rbd_destroy_ops(ops);
992
993 if (ret < 0)
994 return ret;
995
996 return ret;
997}
998
999/*
1000 * Request sync osd read
1001 */
1002static int rbd_req_sync_exec(struct rbd_device *dev,
1003 const char *obj,
1004 const char *cls,
1005 const char *method,
1006 const char *data,
1007 int len)
1008{
1009 struct ceph_osd_req_op *ops;
1010 int cls_len = strlen(cls);
1011 int method_len = strlen(method);
1012 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1013 cls_len + method_len + len);
1014 if (ret < 0)
1015 return ret;
1016
1017 ops[0].cls.class_name = cls;
1018 ops[0].cls.class_len = (__u8)cls_len;
1019 ops[0].cls.method_name = method;
1020 ops[0].cls.method_len = (__u8)method_len;
1021 ops[0].cls.argc = 0;
1022 ops[0].cls.indata = data;
1023 ops[0].cls.indata_len = len;
1024
1025 ret = rbd_req_sync_op(dev, NULL,
1026 CEPH_NOSNAP,
1027 0,
1028 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1029 ops,
1030 1, obj, 0, 0, NULL);
1031
1032 rbd_destroy_ops(ops);
1033
1034 dout("cls_exec returned %d\n", ret);
1035 return ret;
1036}
1037
1038/*
1039 * block device queue callback
1040 */
1041static void rbd_rq_fn(struct request_queue *q)
1042{
1043 struct rbd_device *rbd_dev = q->queuedata;
1044 struct request *rq;
1045 struct bio_pair *bp = NULL;
1046
1047 rq = blk_fetch_request(q);
1048
1049 while (1) {
1050 struct bio *bio;
1051 struct bio *rq_bio, *next_bio = NULL;
1052 bool do_write;
1053 int size, op_size = 0;
1054 u64 ofs;
1055
1056 /* peek at request from block layer */
1057 if (!rq)
1058 break;
1059
1060 dout("fetched request\n");
1061
1062 /* filter out block requests we don't understand */
1063 if ((rq->cmd_type != REQ_TYPE_FS)) {
1064 __blk_end_request_all(rq, 0);
1065 goto next;
1066 }
1067
1068 /* deduce our operation (read, write) */
1069 do_write = (rq_data_dir(rq) == WRITE);
1070
1071 size = blk_rq_bytes(rq);
1072 ofs = blk_rq_pos(rq) * 512ULL;
1073 rq_bio = rq->bio;
1074 if (do_write && rbd_dev->read_only) {
1075 __blk_end_request_all(rq, -EROFS);
1076 goto next;
1077 }
1078
1079 spin_unlock_irq(q->queue_lock);
1080
1081 dout("%s 0x%x bytes at 0x%llx\n",
1082 do_write ? "write" : "read",
1083 size, blk_rq_pos(rq) * 512ULL);
1084
1085 do {
1086 /* a bio clone to be passed down to OSD req */
1087 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1088 op_size = rbd_get_segment(&rbd_dev->header,
1089 rbd_dev->header.block_name,
1090 ofs, size,
1091 NULL, NULL);
1092 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1093 op_size, GFP_ATOMIC);
1094 if (!bio) {
1095 spin_lock_irq(q->queue_lock);
1096 __blk_end_request_all(rq, -ENOMEM);
1097 goto next;
1098 }
1099
1100 /* init OSD command: write or read */
1101 if (do_write)
1102 rbd_req_write(rq, rbd_dev,
1103 rbd_dev->header.snapc,
1104 ofs,
1105 op_size, bio);
1106 else
1107 rbd_req_read(rq, rbd_dev,
1108 cur_snap_id(rbd_dev),
1109 ofs,
1110 op_size, bio);
1111
1112 size -= op_size;
1113 ofs += op_size;
1114
1115 rq_bio = next_bio;
1116 } while (size > 0);
1117
1118 if (bp)
1119 bio_pair_release(bp);
1120
1121 spin_lock_irq(q->queue_lock);
1122next:
1123 rq = blk_fetch_request(q);
1124 }
1125}
1126
1127/*
1128 * a queue callback. Makes sure that we don't create a bio that spans across
1129 * multiple osd objects. One exception would be with a single page bios,
1130 * which we handle later at bio_chain_clone
1131 */
1132static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1133 struct bio_vec *bvec)
1134{
1135 struct rbd_device *rbd_dev = q->queuedata;
1136 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1137 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1138 unsigned int bio_sectors = bmd->bi_size >> 9;
1139 int max;
1140
1141 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1142 + bio_sectors)) << 9;
1143 if (max < 0)
1144 max = 0; /* bio_add cannot handle a negative return */
1145 if (max <= bvec->bv_len && bio_sectors == 0)
1146 return bvec->bv_len;
1147 return max;
1148}
1149
1150static void rbd_free_disk(struct rbd_device *rbd_dev)
1151{
1152 struct gendisk *disk = rbd_dev->disk;
1153
1154 if (!disk)
1155 return;
1156
1157 rbd_header_free(&rbd_dev->header);
1158
1159 if (disk->flags & GENHD_FL_UP)
1160 del_gendisk(disk);
1161 if (disk->queue)
1162 blk_cleanup_queue(disk->queue);
1163 put_disk(disk);
1164}
1165
1166/*
1167 * reload the ondisk the header
1168 */
1169static int rbd_read_header(struct rbd_device *rbd_dev,
1170 struct rbd_image_header *header)
1171{
1172 ssize_t rc;
1173 struct rbd_image_header_ondisk *dh;
1174 int snap_count = 0;
1175 u64 snap_names_len = 0;
1176
1177 while (1) {
1178 int len = sizeof(*dh) +
1179 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1180 snap_names_len;
1181
1182 rc = -ENOMEM;
1183 dh = kmalloc(len, GFP_KERNEL);
1184 if (!dh)
1185 return -ENOMEM;
1186
1187 rc = rbd_req_sync_read(rbd_dev,
1188 NULL, CEPH_NOSNAP,
1189 rbd_dev->obj_md_name,
1190 0, len,
1191 (char *)dh);
1192 if (rc < 0)
1193 goto out_dh;
1194
1195 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1196 if (rc < 0)
1197 goto out_dh;
1198
1199 if (snap_count != header->total_snaps) {
1200 snap_count = header->total_snaps;
1201 snap_names_len = header->snap_names_len;
1202 rbd_header_free(header);
1203 kfree(dh);
1204 continue;
1205 }
1206 break;
1207 }
1208
1209out_dh:
1210 kfree(dh);
1211 return rc;
1212}
1213
1214/*
1215 * create a snapshot
1216 */
1217static int rbd_header_add_snap(struct rbd_device *dev,
1218 const char *snap_name,
1219 gfp_t gfp_flags)
1220{
1221 int name_len = strlen(snap_name);
1222 u64 new_snapid;
1223 int ret;
1224 void *data, *data_start, *data_end;
1225
1226 /* we should create a snapshot only if we're pointing at the head */
1227 if (dev->cur_snap)
1228 return -EINVAL;
1229
1230 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1231 &new_snapid);
1232 dout("created snapid=%lld\n", new_snapid);
1233 if (ret < 0)
1234 return ret;
1235
1236 data = kmalloc(name_len + 16, gfp_flags);
1237 if (!data)
1238 return -ENOMEM;
1239
1240 data_start = data;
1241 data_end = data + name_len + 16;
1242
1243 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1244 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1245
1246 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1247 data_start, data - data_start);
1248
1249 kfree(data_start);
1250
1251 if (ret < 0)
1252 return ret;
1253
1254 dev->header.snapc->seq = new_snapid;
1255
1256 return 0;
1257bad:
1258 return -ERANGE;
1259}
1260
1261/*
1262 * only read the first part of the ondisk header, without the snaps info
1263 */
1264static int rbd_update_snaps(struct rbd_device *rbd_dev)
1265{
1266 int ret;
1267 struct rbd_image_header h;
1268 u64 snap_seq;
1269
1270 ret = rbd_read_header(rbd_dev, &h);
1271 if (ret < 0)
1272 return ret;
1273
1274 down_write(&rbd_dev->header.snap_rwsem);
1275
1276 snap_seq = rbd_dev->header.snapc->seq;
1277
1278 kfree(rbd_dev->header.snapc);
1279 kfree(rbd_dev->header.snap_names);
1280 kfree(rbd_dev->header.snap_sizes);
1281
1282 rbd_dev->header.total_snaps = h.total_snaps;
1283 rbd_dev->header.snapc = h.snapc;
1284 rbd_dev->header.snap_names = h.snap_names;
1285 rbd_dev->header.snap_sizes = h.snap_sizes;
1286 rbd_dev->header.snapc->seq = snap_seq;
1287
1288 up_write(&rbd_dev->header.snap_rwsem);
1289
1290 return 0;
1291}
1292
1293static int rbd_init_disk(struct rbd_device *rbd_dev)
1294{
1295 struct gendisk *disk;
1296 struct request_queue *q;
1297 int rc;
1298 u64 total_size = 0;
1299
1300 /* contact OSD, request size info about the object being mapped */
1301 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1302 if (rc)
1303 return rc;
1304
1305 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1306 if (rc)
1307 return rc;
1308
1309 /* create gendisk info */
1310 rc = -ENOMEM;
1311 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1312 if (!disk)
1313 goto out;
1314
1315 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1316 disk->major = rbd_dev->major;
1317 disk->first_minor = 0;
1318 disk->fops = &rbd_bd_ops;
1319 disk->private_data = rbd_dev;
1320
1321 /* init rq */
1322 rc = -ENOMEM;
1323 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1324 if (!q)
1325 goto out_disk;
1326 blk_queue_merge_bvec(q, rbd_merge_bvec);
1327 disk->queue = q;
1328
1329 q->queuedata = rbd_dev;
1330
1331 rbd_dev->disk = disk;
1332 rbd_dev->q = q;
1333
1334 /* finally, announce the disk to the world */
1335 set_capacity(disk, total_size / 512ULL);
1336 add_disk(disk);
1337
1338 pr_info("%s: added with size 0x%llx\n",
1339 disk->disk_name, (unsigned long long)total_size);
1340 return 0;
1341
1342out_disk:
1343 put_disk(disk);
1344out:
1345 return rc;
1346}
1347
1348/********************************************************************
1349 * /sys/class/rbd/
1350 * add map rados objects to blkdev
1351 * remove unmap rados objects
1352 * list show mappings
1353 *******************************************************************/
1354
1355static void class_rbd_release(struct class *cls)
1356{
1357 kfree(cls);
1358}
1359
1360static ssize_t class_rbd_list(struct class *c,
1361 struct class_attribute *attr,
1362 char *data)
1363{
1364 int n = 0;
1365 struct list_head *tmp;
1366 int max = PAGE_SIZE;
1367
1368 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1369
1370 n += snprintf(data, max,
1371 "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1372
1373 list_for_each(tmp, &rbd_dev_list) {
1374 struct rbd_device *rbd_dev;
1375
1376 rbd_dev = list_entry(tmp, struct rbd_device, node);
1377 n += snprintf(data+n, max-n,
1378 "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1379 rbd_dev->id,
1380 rbd_dev->major,
1381 ceph_client_id(rbd_dev->client),
1382 rbd_dev->pool_name,
1383 rbd_dev->obj, rbd_dev->snap_name,
1384 rbd_dev->header.image_size >> 10);
1385 if (n == max)
1386 break;
1387 }
1388
1389 mutex_unlock(&ctl_mutex);
1390 return n;
1391}
1392
1393static ssize_t class_rbd_add(struct class *c,
1394 struct class_attribute *attr,
1395 const char *buf, size_t count)
1396{
1397 struct ceph_osd_client *osdc;
1398 struct rbd_device *rbd_dev;
1399 ssize_t rc = -ENOMEM;
1400 int irc, new_id = 0;
1401 struct list_head *tmp;
1402 char *mon_dev_name;
1403 char *options;
1404
1405 if (!try_module_get(THIS_MODULE))
1406 return -ENODEV;
1407
1408 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1409 if (!mon_dev_name)
1410 goto err_out_mod;
1411
1412 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1413 if (!options)
1414 goto err_mon_dev;
1415
1416 /* new rbd_device object */
1417 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1418 if (!rbd_dev)
1419 goto err_out_opt;
1420
1421 /* static rbd_device initialization */
1422 spin_lock_init(&rbd_dev->lock);
1423 INIT_LIST_HEAD(&rbd_dev->node);
1424
1425 /* generate unique id: find highest unique id, add one */
1426 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1427
1428 list_for_each(tmp, &rbd_dev_list) {
1429 struct rbd_device *rbd_dev;
1430
1431 rbd_dev = list_entry(tmp, struct rbd_device, node);
1432 if (rbd_dev->id >= new_id)
1433 new_id = rbd_dev->id + 1;
1434 }
1435
1436 rbd_dev->id = new_id;
1437
1438 /* add to global list */
1439 list_add_tail(&rbd_dev->node, &rbd_dev_list);
1440
1441 /* parse add command */
1442 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1443 "%" __stringify(RBD_MAX_OPT_LEN) "s "
1444 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1445 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1446 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1447 mon_dev_name, options, rbd_dev->pool_name,
1448 rbd_dev->obj, rbd_dev->snap_name) < 4) {
1449 rc = -EINVAL;
1450 goto err_out_slot;
1451 }
1452
1453 if (rbd_dev->snap_name[0] == 0)
1454 rbd_dev->snap_name[0] = '-';
1455
1456 rbd_dev->obj_len = strlen(rbd_dev->obj);
1457 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1458 rbd_dev->obj, RBD_SUFFIX);
1459
1460 /* initialize rest of new object */
1461 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1462 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1463 if (rc < 0)
1464 goto err_out_slot;
1465
1466 mutex_unlock(&ctl_mutex);
1467
1468 /* pick the pool */
1469 osdc = &rbd_dev->client->osdc;
1470 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1471 if (rc < 0)
1472 goto err_out_client;
1473 rbd_dev->poolid = rc;
1474
1475 /* register our block device */
1476 irc = register_blkdev(0, rbd_dev->name);
1477 if (irc < 0) {
1478 rc = irc;
1479 goto err_out_client;
1480 }
1481 rbd_dev->major = irc;
1482
1483 /* set up and announce blkdev mapping */
1484 rc = rbd_init_disk(rbd_dev);
1485 if (rc)
1486 goto err_out_blkdev;
1487
1488 return count;
1489
1490err_out_blkdev:
1491 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1492err_out_client:
1493 rbd_put_client(rbd_dev);
1494 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1495err_out_slot:
1496 list_del_init(&rbd_dev->node);
1497 mutex_unlock(&ctl_mutex);
1498
1499 kfree(rbd_dev);
1500err_out_opt:
1501 kfree(options);
1502err_mon_dev:
1503 kfree(mon_dev_name);
1504err_out_mod:
1505 dout("Error adding device %s\n", buf);
1506 module_put(THIS_MODULE);
1507 return rc;
1508}
1509
1510static struct rbd_device *__rbd_get_dev(unsigned long id)
1511{
1512 struct list_head *tmp;
1513 struct rbd_device *rbd_dev;
1514
1515 list_for_each(tmp, &rbd_dev_list) {
1516 rbd_dev = list_entry(tmp, struct rbd_device, node);
1517 if (rbd_dev->id == id)
1518 return rbd_dev;
1519 }
1520 return NULL;
1521}
1522
1523static ssize_t class_rbd_remove(struct class *c,
1524 struct class_attribute *attr,
1525 const char *buf,
1526 size_t count)
1527{
1528 struct rbd_device *rbd_dev = NULL;
1529 int target_id, rc;
1530 unsigned long ul;
1531
1532 rc = strict_strtoul(buf, 10, &ul);
1533 if (rc)
1534 return rc;
1535
1536 /* convert to int; abort if we lost anything in the conversion */
1537 target_id = (int) ul;
1538 if (target_id != ul)
1539 return -EINVAL;
1540
1541 /* remove object from list immediately */
1542 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1543
1544 rbd_dev = __rbd_get_dev(target_id);
1545 if (rbd_dev)
1546 list_del_init(&rbd_dev->node);
1547
1548 mutex_unlock(&ctl_mutex);
1549
1550 if (!rbd_dev)
1551 return -ENOENT;
1552
1553 rbd_put_client(rbd_dev);
1554
1555 /* clean up and free blkdev */
1556 rbd_free_disk(rbd_dev);
1557 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1558 kfree(rbd_dev);
1559
1560 /* release module ref */
1561 module_put(THIS_MODULE);
1562
1563 return count;
1564}
1565
1566static ssize_t class_rbd_snaps_list(struct class *c,
1567 struct class_attribute *attr,
1568 char *data)
1569{
1570 struct rbd_device *rbd_dev = NULL;
1571 struct list_head *tmp;
1572 struct rbd_image_header *header;
1573 int i, n = 0, max = PAGE_SIZE;
1574 int ret;
1575
1576 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1577
1578 n += snprintf(data, max, "#id\tsnap\tKB\n");
1579
1580 list_for_each(tmp, &rbd_dev_list) {
1581 char *names, *p;
1582 struct ceph_snap_context *snapc;
1583
1584 rbd_dev = list_entry(tmp, struct rbd_device, node);
1585 header = &rbd_dev->header;
1586
1587 down_read(&header->snap_rwsem);
1588
1589 names = header->snap_names;
1590 snapc = header->snapc;
1591
1592 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1593 rbd_dev->id, RBD_SNAP_HEAD_NAME,
1594 header->image_size >> 10,
1595 (!rbd_dev->cur_snap ? " (*)" : ""));
1596 if (n == max)
1597 break;
1598
1599 p = names;
1600 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1601 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1602 rbd_dev->id, p, header->snap_sizes[i] >> 10,
1603 (rbd_dev->cur_snap &&
1604 (snap_index(header, i) == rbd_dev->cur_snap) ?
1605 " (*)" : ""));
1606 if (n == max)
1607 break;
1608 }
1609
1610 up_read(&header->snap_rwsem);
1611 }
1612
1613
1614 ret = n;
1615 mutex_unlock(&ctl_mutex);
1616 return ret;
1617}
1618
1619static ssize_t class_rbd_snaps_refresh(struct class *c,
1620 struct class_attribute *attr,
1621 const char *buf,
1622 size_t count)
1623{
1624 struct rbd_device *rbd_dev = NULL;
1625 int target_id, rc;
1626 unsigned long ul;
1627 int ret = count;
1628
1629 rc = strict_strtoul(buf, 10, &ul);
1630 if (rc)
1631 return rc;
1632
1633 /* convert to int; abort if we lost anything in the conversion */
1634 target_id = (int) ul;
1635 if (target_id != ul)
1636 return -EINVAL;
1637
1638 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1639
1640 rbd_dev = __rbd_get_dev(target_id);
1641 if (!rbd_dev) {
1642 ret = -ENOENT;
1643 goto done;
1644 }
1645
1646 rc = rbd_update_snaps(rbd_dev);
1647 if (rc < 0)
1648 ret = rc;
1649
1650done:
1651 mutex_unlock(&ctl_mutex);
1652 return ret;
1653}
1654
1655static ssize_t class_rbd_snap_create(struct class *c,
1656 struct class_attribute *attr,
1657 const char *buf,
1658 size_t count)
1659{
1660 struct rbd_device *rbd_dev = NULL;
1661 int target_id, ret;
1662 char *name;
1663
1664 name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1665 if (!name)
1666 return -ENOMEM;
1667
1668 /* parse snaps add command */
1669 if (sscanf(buf, "%d "
1670 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1671 &target_id,
1672 name) != 2) {
1673 ret = -EINVAL;
1674 goto done;
1675 }
1676
1677 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1678
1679 rbd_dev = __rbd_get_dev(target_id);
1680 if (!rbd_dev) {
1681 ret = -ENOENT;
1682 goto done_unlock;
1683 }
1684
1685 ret = rbd_header_add_snap(rbd_dev,
1686 name, GFP_KERNEL);
1687 if (ret < 0)
1688 goto done_unlock;
1689
1690 ret = rbd_update_snaps(rbd_dev);
1691 if (ret < 0)
1692 goto done_unlock;
1693
1694 ret = count;
1695done_unlock:
1696 mutex_unlock(&ctl_mutex);
1697done:
1698 kfree(name);
1699 return ret;
1700}
1701
1702static ssize_t class_rbd_rollback(struct class *c,
1703 struct class_attribute *attr,
1704 const char *buf,
1705 size_t count)
1706{
1707 struct rbd_device *rbd_dev = NULL;
1708 int target_id, ret;
1709 u64 snapid;
1710 char snap_name[RBD_MAX_SNAP_NAME_LEN];
1711 u64 cur_ofs;
1712 char *seg_name;
1713
1714 /* parse snaps add command */
1715 if (sscanf(buf, "%d "
1716 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1717 &target_id,
1718 snap_name) != 2) {
1719 return -EINVAL;
1720 }
1721
1722 ret = -ENOMEM;
1723 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1724 if (!seg_name)
1725 return ret;
1726
1727 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1728
1729 rbd_dev = __rbd_get_dev(target_id);
1730 if (!rbd_dev) {
1731 ret = -ENOENT;
1732 goto done_unlock;
1733 }
1734
1735 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1736 if (ret < 0)
1737 goto done_unlock;
1738
1739 dout("snapid=%lld\n", snapid);
1740
1741 cur_ofs = 0;
1742 while (cur_ofs < rbd_dev->header.image_size) {
1743 cur_ofs += rbd_get_segment(&rbd_dev->header,
1744 rbd_dev->obj,
1745 cur_ofs, (u64)-1,
1746 seg_name, NULL);
1747 dout("seg_name=%s\n", seg_name);
1748
1749 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1750 if (ret < 0)
1751 pr_warning("could not roll back obj %s err=%d\n",
1752 seg_name, ret);
1753 }
1754
1755 ret = rbd_update_snaps(rbd_dev);
1756 if (ret < 0)
1757 goto done_unlock;
1758
1759 ret = count;
1760
1761done_unlock:
1762 mutex_unlock(&ctl_mutex);
1763 kfree(seg_name);
1764
1765 return ret;
1766}
1767
1768static struct class_attribute class_rbd_attrs[] = {
1769 __ATTR(add, 0200, NULL, class_rbd_add),
1770 __ATTR(remove, 0200, NULL, class_rbd_remove),
1771 __ATTR(list, 0444, class_rbd_list, NULL),
1772 __ATTR(snaps_refresh, 0200, NULL, class_rbd_snaps_refresh),
1773 __ATTR(snap_create, 0200, NULL, class_rbd_snap_create),
1774 __ATTR(snaps_list, 0444, class_rbd_snaps_list, NULL),
1775 __ATTR(snap_rollback, 0200, NULL, class_rbd_rollback),
1776 __ATTR_NULL
1777};
1778
1779/*
1780 * create control files in sysfs
1781 * /sys/class/rbd/...
1782 */
1783static int rbd_sysfs_init(void)
1784{
1785 int ret = -ENOMEM;
1786
1787 class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1788 if (!class_rbd)
1789 goto out;
1790
1791 class_rbd->name = DRV_NAME;
1792 class_rbd->owner = THIS_MODULE;
1793 class_rbd->class_release = class_rbd_release;
1794 class_rbd->class_attrs = class_rbd_attrs;
1795
1796 ret = class_register(class_rbd);
1797 if (ret)
1798 goto out_class;
1799 return 0;
1800
1801out_class:
1802 kfree(class_rbd);
1803 class_rbd = NULL;
1804 pr_err(DRV_NAME ": failed to create class rbd\n");
1805out:
1806 return ret;
1807}
1808
1809static void rbd_sysfs_cleanup(void)
1810{
1811 if (class_rbd)
1812 class_destroy(class_rbd);
1813 class_rbd = NULL;
1814}
1815
1816int __init rbd_init(void)
1817{
1818 int rc;
1819
1820 rc = rbd_sysfs_init();
1821 if (rc)
1822 return rc;
1823 spin_lock_init(&node_lock);
1824 pr_info("loaded " DRV_NAME_LONG "\n");
1825 return 0;
1826}
1827
1828void __exit rbd_exit(void)
1829{
1830 rbd_sysfs_cleanup();
1831}
1832
1833module_init(rbd_init);
1834module_exit(rbd_exit);
1835
1836MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1837MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1838MODULE_DESCRIPTION("rados block device");
1839
1840/* following authorship retained from original osdblk.c */
1841MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1842
1843MODULE_LICENSE("GPL");