1
2/*
3   rbd.c -- Export ceph rados objects as a Linux block device
4
5
6   based on drivers/block/osdblk.c:
7
8   Copyright 2009 Red Hat, Inc.
9
10   This program is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation.
13
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License for more details.
18
19   You should have received a copy of the GNU General Public License
20   along with this program; see the file COPYING.  If not, write to
21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25   For usage instructions, please refer to:
26
27                 Documentation/ABI/testing/sysfs-bus-rbd
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/cls_lock_client.h>
35#include <linux/ceph/striper.h>
36#include <linux/ceph/decode.h>
37#include <linux/fs_parser.h>
38#include <linux/bsearch.h>
39
40#include <linux/kernel.h>
41#include <linux/device.h>
42#include <linux/module.h>
43#include <linux/blk-mq.h>
44#include <linux/fs.h>
45#include <linux/blkdev.h>
46#include <linux/slab.h>
47#include <linux/idr.h>
48#include <linux/workqueue.h>
49
50#include "rbd_types.h"
51
52#define RBD_DEBUG	/* Activate rbd_assert() calls */
53
54/*
55 * Increment the given counter and return its updated value.
56 * If the counter is already 0 it will not be incremented.
57 * If the counter is already at its maximum value returns
58 * -EINVAL without updating it.
59 */
60static int atomic_inc_return_safe(atomic_t *v)
61{
62	unsigned int counter;
63
64	counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65	if (counter <= (unsigned int)INT_MAX)
66		return (int)counter;
67
68	atomic_dec(v);
69
70	return -EINVAL;
71}
72
73/* Decrement the counter.  Return the resulting value, or -EINVAL */
74static int atomic_dec_return_safe(atomic_t *v)
75{
76	int counter;
77
78	counter = atomic_dec_return(v);
79	if (counter >= 0)
80		return counter;
81
82	atomic_inc(v);
83
84	return -EINVAL;
85}
86
87#define RBD_DRV_NAME "rbd"
88
89#define RBD_MINORS_PER_MAJOR		256
90#define RBD_SINGLE_MAJOR_PART_SHIFT	4
91
92#define RBD_MAX_PARENT_CHAIN_LEN	16
93
94#define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
95#define RBD_MAX_SNAP_NAME_LEN	\
96			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98#define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
99
100#define RBD_SNAP_HEAD_NAME	"-"
101
102#define	BAD_SNAP_INDEX	U32_MAX		/* invalid index into snap array */
103
104/* This allows a single page to hold an image name sent by OSD */
105#define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
106#define RBD_IMAGE_ID_LEN_MAX	64
107
108#define RBD_OBJ_PREFIX_LEN_MAX	64
109
110#define RBD_NOTIFY_TIMEOUT	5	/* seconds */
111#define RBD_RETRY_DELAY		msecs_to_jiffies(1000)
112
113/* Feature bits */
114
115#define RBD_FEATURE_LAYERING		(1ULL<<0)
116#define RBD_FEATURE_STRIPINGV2		(1ULL<<1)
117#define RBD_FEATURE_EXCLUSIVE_LOCK	(1ULL<<2)
118#define RBD_FEATURE_OBJECT_MAP		(1ULL<<3)
119#define RBD_FEATURE_FAST_DIFF		(1ULL<<4)
120#define RBD_FEATURE_DEEP_FLATTEN	(1ULL<<5)
121#define RBD_FEATURE_DATA_POOL		(1ULL<<7)
122#define RBD_FEATURE_OPERATIONS		(1ULL<<8)
123
124#define RBD_FEATURES_ALL	(RBD_FEATURE_LAYERING |		\
125				 RBD_FEATURE_STRIPINGV2 |	\
126				 RBD_FEATURE_EXCLUSIVE_LOCK |	\
127				 RBD_FEATURE_OBJECT_MAP |	\
128				 RBD_FEATURE_FAST_DIFF |	\
129				 RBD_FEATURE_DEEP_FLATTEN |	\
130				 RBD_FEATURE_DATA_POOL |	\
131				 RBD_FEATURE_OPERATIONS)
132
133/* Features supported by this (client software) implementation. */
134
135#define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
136
137/*
138 * An RBD device name will be "rbd#", where the "rbd" comes from
139 * RBD_DRV_NAME above, and # is a unique integer identifier.
140 */
141#define DEV_NAME_LEN		32
142
143/*
144 * block device image metadata (in-memory version)
145 */
146struct rbd_image_header {
147	/* These six fields never change for a given rbd image */
148	char *object_prefix;
149	__u8 obj_order;
150	u64 stripe_unit;
151	u64 stripe_count;
152	s64 data_pool_id;
153	u64 features;		/* Might be changeable someday? */
154
155	/* The remaining fields need to be updated occasionally */
156	u64 image_size;
157	struct ceph_snap_context *snapc;
158	char *snap_names;	/* format 1 only */
159	u64 *snap_sizes;	/* format 1 only */
160};
161
162/*
163 * An rbd image specification.
164 *
165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166 * identify an image.  Each rbd_dev structure includes a pointer to
167 * an rbd_spec structure that encapsulates this identity.
168 *
169 * Each of the id's in an rbd_spec has an associated name.  For a
170 * user-mapped image, the names are supplied and the id's associated
171 * with them are looked up.  For a layered image, a parent image is
172 * defined by the tuple, and the names are looked up.
173 *
174 * An rbd_dev structure contains a parent_spec pointer which is
175 * non-null if the image it represents is a child in a layered
176 * image.  This pointer will refer to the rbd_spec structure used
177 * by the parent rbd_dev for its own identity (i.e., the structure
178 * is shared between the parent and child).
179 *
180 * Since these structures are populated once, during the discovery
181 * phase of image construction, they are effectively immutable so
182 * we make no effort to synchronize access to them.
183 *
184 * Note that code herein does not assume the image name is known (it
185 * could be a null pointer).
186 */
187struct rbd_spec {
188	u64		pool_id;
189	const char	*pool_name;
190	const char	*pool_ns;	/* NULL if default, never "" */
191
192	const char	*image_id;
193	const char	*image_name;
194
195	u64		snap_id;
196	const char	*snap_name;
197
198	struct kref	kref;
199};
200
201/*
202 * an instance of the client.  multiple devices may share an rbd client.
203 */
204struct rbd_client {
205	struct ceph_client	*client;
206	struct kref		kref;
207	struct list_head	node;
208};
209
210struct pending_result {
211	int			result;		/* first nonzero result */
212	int			num_pending;
213};
214
215struct rbd_img_request;
216
217enum obj_request_type {
218	OBJ_REQUEST_NODATA = 1,
219	OBJ_REQUEST_BIO,	/* pointer into provided bio (list) */
220	OBJ_REQUEST_BVECS,	/* pointer into provided bio_vec array */
221	OBJ_REQUEST_OWN_BVECS,	/* private bio_vec array, doesn't own pages */
222};
223
224enum obj_operation_type {
225	OBJ_OP_READ = 1,
226	OBJ_OP_WRITE,
227	OBJ_OP_DISCARD,
228	OBJ_OP_ZEROOUT,
229};
230
231#define RBD_OBJ_FLAG_DELETION			(1U << 0)
232#define RBD_OBJ_FLAG_COPYUP_ENABLED		(1U << 1)
233#define RBD_OBJ_FLAG_COPYUP_ZEROS		(1U << 2)
234#define RBD_OBJ_FLAG_MAY_EXIST			(1U << 3)
235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT	(1U << 4)
236
237enum rbd_obj_read_state {
238	RBD_OBJ_READ_START = 1,
239	RBD_OBJ_READ_OBJECT,
240	RBD_OBJ_READ_PARENT,
241};
242
243/*
244 * Writes go through the following state machine to deal with
245 * layering:
246 *
247 *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248 *            .                 |                                    .
249 *            .                 v                                    .
250 *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
251 *            .                 |                    .               .
252 *            .                 v                    v (deep-copyup  .
253 *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
254 * flattened) v                 |                    .               .
255 *            .                 v                    .               .
256 *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
257 *                              |                        not needed) v
258 *                              v                                    .
259 *                            done . . . . . . . . . . . . . . . . . .
260 *                              ^
261 *                              |
262 *                     RBD_OBJ_WRITE_FLAT
263 *
264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265 * assert_exists guard is needed or not (in some cases it's not needed
266 * even if there is a parent).
267 */
268enum rbd_obj_write_state {
269	RBD_OBJ_WRITE_START = 1,
270	RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271	RBD_OBJ_WRITE_OBJECT,
272	__RBD_OBJ_WRITE_COPYUP,
273	RBD_OBJ_WRITE_COPYUP,
274	RBD_OBJ_WRITE_POST_OBJECT_MAP,
275};
276
277enum rbd_obj_copyup_state {
278	RBD_OBJ_COPYUP_START = 1,
279	RBD_OBJ_COPYUP_READ_PARENT,
280	__RBD_OBJ_COPYUP_OBJECT_MAPS,
281	RBD_OBJ_COPYUP_OBJECT_MAPS,
282	__RBD_OBJ_COPYUP_WRITE_OBJECT,
283	RBD_OBJ_COPYUP_WRITE_OBJECT,
284};
285
286struct rbd_obj_request {
287	struct ceph_object_extent ex;
288	unsigned int		flags;	/* RBD_OBJ_FLAG_* */
289	union {
290		enum rbd_obj_read_state	 read_state;	/* for reads */
291		enum rbd_obj_write_state write_state;	/* for writes */
292	};
293
294	struct rbd_img_request	*img_request;
295	struct ceph_file_extent	*img_extents;
296	u32			num_img_extents;
297
298	union {
299		struct ceph_bio_iter	bio_pos;
300		struct {
301			struct ceph_bvec_iter	bvec_pos;
302			u32			bvec_count;
303			u32			bvec_idx;
304		};
305	};
306
307	enum rbd_obj_copyup_state copyup_state;
308	struct bio_vec		*copyup_bvecs;
309	u32			copyup_bvec_count;
310
311	struct list_head	osd_reqs;	/* w/ r_private_item */
312
313	struct mutex		state_mutex;
314	struct pending_result	pending;
315	struct kref		kref;
316};
317
318enum img_req_flags {
319	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
320	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
321};
322
323enum rbd_img_state {
324	RBD_IMG_START = 1,
325	RBD_IMG_EXCLUSIVE_LOCK,
326	__RBD_IMG_OBJECT_REQUESTS,
327	RBD_IMG_OBJECT_REQUESTS,
328};
329
330struct rbd_img_request {
331	struct rbd_device	*rbd_dev;
332	enum obj_operation_type	op_type;
333	enum obj_request_type	data_type;
334	unsigned long		flags;
335	enum rbd_img_state	state;
336	union {
337		u64			snap_id;	/* for reads */
338		struct ceph_snap_context *snapc;	/* for writes */
339	};
340	struct rbd_obj_request	*obj_request;	/* obj req initiator */
341
342	struct list_head	lock_item;
343	struct list_head	object_extents;	/* obj_req.ex structs */
344
345	struct mutex		state_mutex;
346	struct pending_result	pending;
347	struct work_struct	work;
348	int			work_result;
349};
350
351#define for_each_obj_request(ireq, oreq) \
352	list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
353#define for_each_obj_request_safe(ireq, oreq, n) \
354	list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
355
356enum rbd_watch_state {
357	RBD_WATCH_STATE_UNREGISTERED,
358	RBD_WATCH_STATE_REGISTERED,
359	RBD_WATCH_STATE_ERROR,
360};
361
362enum rbd_lock_state {
363	RBD_LOCK_STATE_UNLOCKED,
364	RBD_LOCK_STATE_LOCKED,
365	RBD_LOCK_STATE_RELEASING,
366};
367
368/* WatchNotify::ClientId */
369struct rbd_client_id {
370	u64 gid;
371	u64 handle;
372};
373
374struct rbd_mapping {
375	u64                     size;
376};
377
378/*
379 * a single device
380 */
381struct rbd_device {
382	int			dev_id;		/* blkdev unique id */
383
384	int			major;		/* blkdev assigned major */
385	int			minor;
386	struct gendisk		*disk;		/* blkdev's gendisk and rq */
387
388	u32			image_format;	/* Either 1 or 2 */
389	struct rbd_client	*rbd_client;
390
391	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
392
393	spinlock_t		lock;		/* queue, flags, open_count */
394
395	struct rbd_image_header	header;
396	unsigned long		flags;		/* possibly lock protected */
397	struct rbd_spec		*spec;
398	struct rbd_options	*opts;
399	char			*config_info;	/* add{,_single_major} string */
400
401	struct ceph_object_id	header_oid;
402	struct ceph_object_locator header_oloc;
403
404	struct ceph_file_layout	layout;		/* used for all rbd requests */
405
406	struct mutex		watch_mutex;
407	enum rbd_watch_state	watch_state;
408	struct ceph_osd_linger_request *watch_handle;
409	u64			watch_cookie;
410	struct delayed_work	watch_dwork;
411
412	struct rw_semaphore	lock_rwsem;
413	enum rbd_lock_state	lock_state;
414	char			lock_cookie[32];
415	struct rbd_client_id	owner_cid;
416	struct work_struct	acquired_lock_work;
417	struct work_struct	released_lock_work;
418	struct delayed_work	lock_dwork;
419	struct work_struct	unlock_work;
420	spinlock_t		lock_lists_lock;
421	struct list_head	acquiring_list;
422	struct list_head	running_list;
423	struct completion	acquire_wait;
424	int			acquire_err;
425	struct completion	releasing_wait;
426
427	spinlock_t		object_map_lock;
428	u8			*object_map;
429	u64			object_map_size;	/* in objects */
430	u64			object_map_flags;
431
432	struct workqueue_struct	*task_wq;
433
434	struct rbd_spec		*parent_spec;
435	u64			parent_overlap;
436	atomic_t		parent_ref;
437	struct rbd_device	*parent;
438
439	/* Block layer tags. */
440	struct blk_mq_tag_set	tag_set;
441
442	/* protects updating the header */
443	struct rw_semaphore     header_rwsem;
444
445	struct rbd_mapping	mapping;
446
447	struct list_head	node;
448
449	/* sysfs related */
450	struct device		dev;
451	unsigned long		open_count;	/* protected by lock */
452};
453
454/*
455 * Flag bits for rbd_dev->flags:
456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
457 *   by rbd_dev->lock
458 */
459enum rbd_dev_flags {
460	RBD_DEV_FLAG_EXISTS,	/* rbd_dev_device_setup() ran */
461	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
462	RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
463};
464
465static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */
466
467static LIST_HEAD(rbd_dev_list);    /* devices */
468static DEFINE_SPINLOCK(rbd_dev_list_lock);
469
470static LIST_HEAD(rbd_client_list);		/* clients */
471static DEFINE_SPINLOCK(rbd_client_list_lock);
472
473/* Slab caches for frequently-allocated structures */
474
475static struct kmem_cache	*rbd_img_request_cache;
476static struct kmem_cache	*rbd_obj_request_cache;
477
478static int rbd_major;
479static DEFINE_IDA(rbd_dev_id_ida);
480
481static struct workqueue_struct *rbd_wq;
482
483static struct ceph_snap_context rbd_empty_snapc = {
484	.nref = REFCOUNT_INIT(1),
485};
486
487/*
488 * single-major requires >= 0.75 version of userspace rbd utility.
489 */
490static bool single_major = true;
491module_param(single_major, bool, 0444);
492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
493
494static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count);
495static ssize_t remove_store(const struct bus_type *bus, const char *buf,
496			    size_t count);
497static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf,
498				      size_t count);
499static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf,
500					 size_t count);
501static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
502
503static int rbd_dev_id_to_minor(int dev_id)
504{
505	return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
506}
507
508static int minor_to_rbd_dev_id(int minor)
509{
510	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
511}
512
513static bool rbd_is_ro(struct rbd_device *rbd_dev)
514{
515	return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
516}
517
518static bool rbd_is_snap(struct rbd_device *rbd_dev)
519{
520	return rbd_dev->spec->snap_id != CEPH_NOSNAP;
521}
522
523static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
524{
525	lockdep_assert_held(&rbd_dev->lock_rwsem);
526
527	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
528	       rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
529}
530
531static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
532{
533	bool is_lock_owner;
534
535	down_read(&rbd_dev->lock_rwsem);
536	is_lock_owner = __rbd_is_lock_owner(rbd_dev);
537	up_read(&rbd_dev->lock_rwsem);
538	return is_lock_owner;
539}
540
541static ssize_t supported_features_show(const struct bus_type *bus, char *buf)
542{
543	return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
544}
545
546static BUS_ATTR_WO(add);
547static BUS_ATTR_WO(remove);
548static BUS_ATTR_WO(add_single_major);
549static BUS_ATTR_WO(remove_single_major);
550static BUS_ATTR_RO(supported_features);
551
552static struct attribute *rbd_bus_attrs[] = {
553	&bus_attr_add.attr,
554	&bus_attr_remove.attr,
555	&bus_attr_add_single_major.attr,
556	&bus_attr_remove_single_major.attr,
557	&bus_attr_supported_features.attr,
558	NULL,
559};
560
561static umode_t rbd_bus_is_visible(struct kobject *kobj,
562				  struct attribute *attr, int index)
563{
564	if (!single_major &&
565	    (attr == &bus_attr_add_single_major.attr ||
566	     attr == &bus_attr_remove_single_major.attr))
567		return 0;
568
569	return attr->mode;
570}
571
572static const struct attribute_group rbd_bus_group = {
573	.attrs = rbd_bus_attrs,
574	.is_visible = rbd_bus_is_visible,
575};
576__ATTRIBUTE_GROUPS(rbd_bus);
577
578static const struct bus_type rbd_bus_type = {
579	.name		= "rbd",
580	.bus_groups	= rbd_bus_groups,
581};
582
583static void rbd_root_dev_release(struct device *dev)
584{
585}
586
587static struct device rbd_root_dev = {
588	.init_name =    "rbd",
589	.release =      rbd_root_dev_release,
590};
591
592static __printf(2, 3)
593void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
594{
595	struct va_format vaf;
596	va_list args;
597
598	va_start(args, fmt);
599	vaf.fmt = fmt;
600	vaf.va = &args;
601
602	if (!rbd_dev)
603		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
604	else if (rbd_dev->disk)
605		printk(KERN_WARNING "%s: %s: %pV\n",
606			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
607	else if (rbd_dev->spec && rbd_dev->spec->image_name)
608		printk(KERN_WARNING "%s: image %s: %pV\n",
609			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
610	else if (rbd_dev->spec && rbd_dev->spec->image_id)
611		printk(KERN_WARNING "%s: id %s: %pV\n",
612			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
613	else	/* punt */
614		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
615			RBD_DRV_NAME, rbd_dev, &vaf);
616	va_end(args);
617}
618
619#ifdef RBD_DEBUG
620#define rbd_assert(expr)						\
621		if (unlikely(!(expr))) {				\
622			printk(KERN_ERR "\nAssertion failure in %s() "	\
623						"at line %d:\n\n"	\
624					"\trbd_assert(%s);\n\n",	\
625					__func__, __LINE__, #expr);	\
626			BUG();						\
627		}
628#else /* !RBD_DEBUG */
629#  define rbd_assert(expr)	((void) 0)
630#endif /* !RBD_DEBUG */
631
632static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
633
634static int rbd_dev_refresh(struct rbd_device *rbd_dev);
635static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev,
636				     struct rbd_image_header *header);
637static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
638					u64 snap_id);
639static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
640				u8 *order, u64 *snap_size);
641static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
642
643static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
644static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
645
646/*
647 * Return true if nothing else is pending.
648 */
649static bool pending_result_dec(struct pending_result *pending, int *result)
650{
651	rbd_assert(pending->num_pending > 0);
652
653	if (*result && !pending->result)
654		pending->result = *result;
655	if (--pending->num_pending)
656		return false;
657
658	*result = pending->result;
659	return true;
660}
661
662static int rbd_open(struct gendisk *disk, blk_mode_t mode)
663{
664	struct rbd_device *rbd_dev = disk->private_data;
665	bool removing = false;
666
667	spin_lock_irq(&rbd_dev->lock);
668	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
669		removing = true;
670	else
671		rbd_dev->open_count++;
672	spin_unlock_irq(&rbd_dev->lock);
673	if (removing)
674		return -ENOENT;
675
676	(void) get_device(&rbd_dev->dev);
677
678	return 0;
679}
680
681static void rbd_release(struct gendisk *disk)
682{
683	struct rbd_device *rbd_dev = disk->private_data;
684	unsigned long open_count_before;
685
686	spin_lock_irq(&rbd_dev->lock);
687	open_count_before = rbd_dev->open_count--;
688	spin_unlock_irq(&rbd_dev->lock);
689	rbd_assert(open_count_before > 0);
690
691	put_device(&rbd_dev->dev);
692}
693
694static const struct block_device_operations rbd_bd_ops = {
695	.owner			= THIS_MODULE,
696	.open			= rbd_open,
697	.release		= rbd_release,
698};
699
700/*
701 * Initialize an rbd client instance.  Success or not, this function
702 * consumes ceph_opts.  Caller holds client_mutex.
703 */
704static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
705{
706	struct rbd_client *rbdc;
707	int ret = -ENOMEM;
708
709	dout("%s:\n", __func__);
710	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
711	if (!rbdc)
712		goto out_opt;
713
714	kref_init(&rbdc->kref);
715	INIT_LIST_HEAD(&rbdc->node);
716
717	rbdc->client = ceph_create_client(ceph_opts, rbdc);
718	if (IS_ERR(rbdc->client))
719		goto out_rbdc;
720	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
721
722	ret = ceph_open_session(rbdc->client);
723	if (ret < 0)
724		goto out_client;
725
726	spin_lock(&rbd_client_list_lock);
727	list_add_tail(&rbdc->node, &rbd_client_list);
728	spin_unlock(&rbd_client_list_lock);
729
730	dout("%s: rbdc %p\n", __func__, rbdc);
731
732	return rbdc;
733out_client:
734	ceph_destroy_client(rbdc->client);
735out_rbdc:
736	kfree(rbdc);
737out_opt:
738	if (ceph_opts)
739		ceph_destroy_options(ceph_opts);
740	dout("%s: error %d\n", __func__, ret);
741
742	return ERR_PTR(ret);
743}
744
745static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
746{
747	kref_get(&rbdc->kref);
748
749	return rbdc;
750}
751
752/*
753 * Find a ceph client with specific addr and configuration.  If
754 * found, bump its reference count.
755 */
756static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
757{
758	struct rbd_client *rbdc = NULL, *iter;
759
760	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
761		return NULL;
762
763	spin_lock(&rbd_client_list_lock);
764	list_for_each_entry(iter, &rbd_client_list, node) {
765		if (!ceph_compare_options(ceph_opts, iter->client)) {
766			__rbd_get_client(iter);
767
768			rbdc = iter;
769			break;
770		}
771	}
772	spin_unlock(&rbd_client_list_lock);
773
774	return rbdc;
775}
776
777/*
778 * (Per device) rbd map options
779 */
780enum {
781	Opt_queue_depth,
782	Opt_alloc_size,
783	Opt_lock_timeout,
784	/* int args above */
785	Opt_pool_ns,
786	Opt_compression_hint,
787	/* string args above */
788	Opt_read_only,
789	Opt_read_write,
790	Opt_lock_on_read,
791	Opt_exclusive,
792	Opt_notrim,
793};
794
795enum {
796	Opt_compression_hint_none,
797	Opt_compression_hint_compressible,
798	Opt_compression_hint_incompressible,
799};
800
801static const struct constant_table rbd_param_compression_hint[] = {
802	{"none",		Opt_compression_hint_none},
803	{"compressible",	Opt_compression_hint_compressible},
804	{"incompressible",	Opt_compression_hint_incompressible},
805	{}
806};
807
808static const struct fs_parameter_spec rbd_parameters[] = {
809	fsparam_u32	("alloc_size",			Opt_alloc_size),
810	fsparam_enum	("compression_hint",		Opt_compression_hint,
811			 rbd_param_compression_hint),
812	fsparam_flag	("exclusive",			Opt_exclusive),
813	fsparam_flag	("lock_on_read",		Opt_lock_on_read),
814	fsparam_u32	("lock_timeout",		Opt_lock_timeout),
815	fsparam_flag	("notrim",			Opt_notrim),
816	fsparam_string	("_pool_ns",			Opt_pool_ns),
817	fsparam_u32	("queue_depth",			Opt_queue_depth),
818	fsparam_flag	("read_only",			Opt_read_only),
819	fsparam_flag	("read_write",			Opt_read_write),
820	fsparam_flag	("ro",				Opt_read_only),
821	fsparam_flag	("rw",				Opt_read_write),
822	{}
823};
824
825struct rbd_options {
826	int	queue_depth;
827	int	alloc_size;
828	unsigned long	lock_timeout;
829	bool	read_only;
830	bool	lock_on_read;
831	bool	exclusive;
832	bool	trim;
833
834	u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
835};
836
837#define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_DEFAULT_RQ
838#define RBD_ALLOC_SIZE_DEFAULT	(64 * 1024)
839#define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
840#define RBD_READ_ONLY_DEFAULT	false
841#define RBD_LOCK_ON_READ_DEFAULT false
842#define RBD_EXCLUSIVE_DEFAULT	false
843#define RBD_TRIM_DEFAULT	true
844
845struct rbd_parse_opts_ctx {
846	struct rbd_spec		*spec;
847	struct ceph_options	*copts;
848	struct rbd_options	*opts;
849};
850
851static char* obj_op_name(enum obj_operation_type op_type)
852{
853	switch (op_type) {
854	case OBJ_OP_READ:
855		return "read";
856	case OBJ_OP_WRITE:
857		return "write";
858	case OBJ_OP_DISCARD:
859		return "discard";
860	case OBJ_OP_ZEROOUT:
861		return "zeroout";
862	default:
863		return "???";
864	}
865}
866
867/*
868 * Destroy ceph client
869 *
870 * Caller must hold rbd_client_list_lock.
871 */
872static void rbd_client_release(struct kref *kref)
873{
874	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
875
876	dout("%s: rbdc %p\n", __func__, rbdc);
877	spin_lock(&rbd_client_list_lock);
878	list_del(&rbdc->node);
879	spin_unlock(&rbd_client_list_lock);
880
881	ceph_destroy_client(rbdc->client);
882	kfree(rbdc);
883}
884
885/*
886 * Drop reference to ceph client node. If it's not referenced anymore, release
887 * it.
888 */
889static void rbd_put_client(struct rbd_client *rbdc)
890{
891	if (rbdc)
892		kref_put(&rbdc->kref, rbd_client_release);
893}
894
895/*
896 * Get a ceph client with specific addr and configuration, if one does
897 * not exist create it.  Either way, ceph_opts is consumed by this
898 * function.
899 */
900static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
901{
902	struct rbd_client *rbdc;
903	int ret;
904
905	mutex_lock(&client_mutex);
906	rbdc = rbd_client_find(ceph_opts);
907	if (rbdc) {
908		ceph_destroy_options(ceph_opts);
909
910		/*
911		 * Using an existing client.  Make sure ->pg_pools is up to
912		 * date before we look up the pool id in do_rbd_add().
913		 */
914		ret = ceph_wait_for_latest_osdmap(rbdc->client,
915					rbdc->client->options->mount_timeout);
916		if (ret) {
917			rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
918			rbd_put_client(rbdc);
919			rbdc = ERR_PTR(ret);
920		}
921	} else {
922		rbdc = rbd_client_create(ceph_opts);
923	}
924	mutex_unlock(&client_mutex);
925
926	return rbdc;
927}
928
929static bool rbd_image_format_valid(u32 image_format)
930{
931	return image_format == 1 || image_format == 2;
932}
933
934static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
935{
936	size_t size;
937	u32 snap_count;
938
939	/* The header has to start with the magic rbd header text */
940	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
941		return false;
942
943	/* The bio layer requires at least sector-sized I/O */
944
945	if (ondisk->options.order < SECTOR_SHIFT)
946		return false;
947
948	/* If we use u64 in a few spots we may be able to loosen this */
949
950	if (ondisk->options.order > 8 * sizeof (int) - 1)
951		return false;
952
953	/*
954	 * The size of a snapshot header has to fit in a size_t, and
955	 * that limits the number of snapshots.
956	 */
957	snap_count = le32_to_cpu(ondisk->snap_count);
958	size = SIZE_MAX - sizeof (struct ceph_snap_context);
959	if (snap_count > size / sizeof (__le64))
960		return false;
961
962	/*
963	 * Not only that, but the size of the entire the snapshot
964	 * header must also be representable in a size_t.
965	 */
966	size -= snap_count * sizeof (__le64);
967	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
968		return false;
969
970	return true;
971}
972
973/*
974 * returns the size of an object in the image
975 */
976static u32 rbd_obj_bytes(struct rbd_image_header *header)
977{
978	return 1U << header->obj_order;
979}
980
981static void rbd_init_layout(struct rbd_device *rbd_dev)
982{
983	if (rbd_dev->header.stripe_unit == 0 ||
984	    rbd_dev->header.stripe_count == 0) {
985		rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
986		rbd_dev->header.stripe_count = 1;
987	}
988
989	rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
990	rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
991	rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
992	rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
993			  rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
994	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
995}
996
997static void rbd_image_header_cleanup(struct rbd_image_header *header)
998{
999	kfree(header->object_prefix);
1000	ceph_put_snap_context(header->snapc);
1001	kfree(header->snap_sizes);
1002	kfree(header->snap_names);
1003
1004	memset(header, 0, sizeof(*header));
1005}
1006
1007/*
1008 * Fill an rbd image header with information from the given format 1
1009 * on-disk header.
1010 */
1011static int rbd_header_from_disk(struct rbd_image_header *header,
1012				struct rbd_image_header_ondisk *ondisk,
1013				bool first_time)
1014{
1015	struct ceph_snap_context *snapc;
1016	char *object_prefix = NULL;
1017	char *snap_names = NULL;
1018	u64 *snap_sizes = NULL;
1019	u32 snap_count;
1020	int ret = -ENOMEM;
1021	u32 i;
1022
1023	/* Allocate this now to avoid having to handle failure below */
1024
1025	if (first_time) {
1026		object_prefix = kstrndup(ondisk->object_prefix,
1027					 sizeof(ondisk->object_prefix),
1028					 GFP_KERNEL);
1029		if (!object_prefix)
1030			return -ENOMEM;
1031	}
1032
1033	/* Allocate the snapshot context and fill it in */
1034
1035	snap_count = le32_to_cpu(ondisk->snap_count);
1036	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1037	if (!snapc)
1038		goto out_err;
1039	snapc->seq = le64_to_cpu(ondisk->snap_seq);
1040	if (snap_count) {
1041		struct rbd_image_snap_ondisk *snaps;
1042		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1043
1044		/* We'll keep a copy of the snapshot names... */
1045
1046		if (snap_names_len > (u64)SIZE_MAX)
1047			goto out_2big;
1048		snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1049		if (!snap_names)
1050			goto out_err;
1051
1052		/* ...as well as the array of their sizes. */
1053		snap_sizes = kmalloc_array(snap_count,
1054					   sizeof(*header->snap_sizes),
1055					   GFP_KERNEL);
1056		if (!snap_sizes)
1057			goto out_err;
1058
1059		/*
1060		 * Copy the names, and fill in each snapshot's id
1061		 * and size.
1062		 *
1063		 * Note that rbd_dev_v1_header_info() guarantees the
1064		 * ondisk buffer we're working with has
1065		 * snap_names_len bytes beyond the end of the
1066		 * snapshot id array, this memcpy() is safe.
1067		 */
1068		memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1069		snaps = ondisk->snaps;
1070		for (i = 0; i < snap_count; i++) {
1071			snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1072			snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1073		}
1074	}
1075
1076	/* We won't fail any more, fill in the header */
1077
1078	if (first_time) {
1079		header->object_prefix = object_prefix;
1080		header->obj_order = ondisk->options.order;
1081	}
1082
1083	/* The remaining fields always get updated (when we refresh) */
1084
1085	header->image_size = le64_to_cpu(ondisk->image_size);
1086	header->snapc = snapc;
1087	header->snap_names = snap_names;
1088	header->snap_sizes = snap_sizes;
1089
1090	return 0;
1091out_2big:
1092	ret = -EIO;
1093out_err:
1094	kfree(snap_sizes);
1095	kfree(snap_names);
1096	ceph_put_snap_context(snapc);
1097	kfree(object_prefix);
1098
1099	return ret;
1100}
1101
1102static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1103{
1104	const char *snap_name;
1105
1106	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1107
1108	/* Skip over names until we find the one we are looking for */
1109
1110	snap_name = rbd_dev->header.snap_names;
1111	while (which--)
1112		snap_name += strlen(snap_name) + 1;
1113
1114	return kstrdup(snap_name, GFP_KERNEL);
1115}
1116
1117/*
1118 * Snapshot id comparison function for use with qsort()/bsearch().
1119 * Note that result is for snapshots in *descending* order.
1120 */
1121static int snapid_compare_reverse(const void *s1, const void *s2)
1122{
1123	u64 snap_id1 = *(u64 *)s1;
1124	u64 snap_id2 = *(u64 *)s2;
1125
1126	if (snap_id1 < snap_id2)
1127		return 1;
1128	return snap_id1 == snap_id2 ? 0 : -1;
1129}
1130
1131/*
1132 * Search a snapshot context to see if the given snapshot id is
1133 * present.
1134 *
1135 * Returns the position of the snapshot id in the array if it's found,
1136 * or BAD_SNAP_INDEX otherwise.
1137 *
1138 * Note: The snapshot array is in kept sorted (by the osd) in
1139 * reverse order, highest snapshot id first.
1140 */
1141static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1142{
1143	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1144	u64 *found;
1145
1146	found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1147				sizeof (snap_id), snapid_compare_reverse);
1148
1149	return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1150}
1151
1152static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1153					u64 snap_id)
1154{
1155	u32 which;
1156	const char *snap_name;
1157
1158	which = rbd_dev_snap_index(rbd_dev, snap_id);
1159	if (which == BAD_SNAP_INDEX)
1160		return ERR_PTR(-ENOENT);
1161
1162	snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1163	return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1164}
1165
1166static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1167{
1168	if (snap_id == CEPH_NOSNAP)
1169		return RBD_SNAP_HEAD_NAME;
1170
1171	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1172	if (rbd_dev->image_format == 1)
1173		return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1174
1175	return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1176}
1177
1178static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1179				u64 *snap_size)
1180{
1181	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1182	if (snap_id == CEPH_NOSNAP) {
1183		*snap_size = rbd_dev->header.image_size;
1184	} else if (rbd_dev->image_format == 1) {
1185		u32 which;
1186
1187		which = rbd_dev_snap_index(rbd_dev, snap_id);
1188		if (which == BAD_SNAP_INDEX)
1189			return -ENOENT;
1190
1191		*snap_size = rbd_dev->header.snap_sizes[which];
1192	} else {
1193		u64 size = 0;
1194		int ret;
1195
1196		ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1197		if (ret)
1198			return ret;
1199
1200		*snap_size = size;
1201	}
1202	return 0;
1203}
1204
1205static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1206{
1207	u64 snap_id = rbd_dev->spec->snap_id;
1208	u64 size = 0;
1209	int ret;
1210
1211	ret = rbd_snap_size(rbd_dev, snap_id, &size);
1212	if (ret)
1213		return ret;
1214
1215	rbd_dev->mapping.size = size;
1216	return 0;
1217}
1218
1219static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1220{
1221	rbd_dev->mapping.size = 0;
1222}
1223
1224static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1225{
1226	struct ceph_bio_iter it = *bio_pos;
1227
1228	ceph_bio_iter_advance(&it, off);
1229	ceph_bio_iter_advance_step(&it, bytes, ({
1230		memzero_bvec(&bv);
1231	}));
1232}
1233
1234static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1235{
1236	struct ceph_bvec_iter it = *bvec_pos;
1237
1238	ceph_bvec_iter_advance(&it, off);
1239	ceph_bvec_iter_advance_step(&it, bytes, ({
1240		memzero_bvec(&bv);
1241	}));
1242}
1243
1244/*
1245 * Zero a range in @obj_req data buffer defined by a bio (list) or
1246 * (private) bio_vec array.
1247 *
1248 * @off is relative to the start of the data buffer.
1249 */
1250static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1251			       u32 bytes)
1252{
1253	dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1254
1255	switch (obj_req->img_request->data_type) {
1256	case OBJ_REQUEST_BIO:
1257		zero_bios(&obj_req->bio_pos, off, bytes);
1258		break;
1259	case OBJ_REQUEST_BVECS:
1260	case OBJ_REQUEST_OWN_BVECS:
1261		zero_bvecs(&obj_req->bvec_pos, off, bytes);
1262		break;
1263	default:
1264		BUG();
1265	}
1266}
1267
1268static void rbd_obj_request_destroy(struct kref *kref);
1269static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1270{
1271	rbd_assert(obj_request != NULL);
1272	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1273		kref_read(&obj_request->kref));
1274	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1275}
1276
1277static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1278					struct rbd_obj_request *obj_request)
1279{
1280	rbd_assert(obj_request->img_request == NULL);
1281
1282	/* Image request now owns object's original reference */
1283	obj_request->img_request = img_request;
1284	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1285}
1286
1287static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1288					struct rbd_obj_request *obj_request)
1289{
1290	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1291	list_del(&obj_request->ex.oe_item);
1292	rbd_assert(obj_request->img_request == img_request);
1293	rbd_obj_request_put(obj_request);
1294}
1295
1296static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1297{
1298	struct rbd_obj_request *obj_req = osd_req->r_priv;
1299
1300	dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1301	     __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1302	     obj_req->ex.oe_off, obj_req->ex.oe_len);
1303	ceph_osdc_start_request(osd_req->r_osdc, osd_req);
1304}
1305
1306/*
1307 * The default/initial value for all image request flags is 0.  Each
1308 * is conditionally set to 1 at image request initialization time
1309 * and currently never change thereafter.
1310 */
1311static void img_request_layered_set(struct rbd_img_request *img_request)
1312{
1313	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1314}
1315
1316static bool img_request_layered_test(struct rbd_img_request *img_request)
1317{
1318	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1319}
1320
1321static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1322{
1323	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1324
1325	return !obj_req->ex.oe_off &&
1326	       obj_req->ex.oe_len == rbd_dev->layout.object_size;
1327}
1328
1329static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1330{
1331	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1332
1333	return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1334					rbd_dev->layout.object_size;
1335}
1336
1337/*
1338 * Must be called after rbd_obj_calc_img_extents().
1339 */
1340static void rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req)
1341{
1342	rbd_assert(obj_req->img_request->snapc);
1343
1344	if (obj_req->img_request->op_type == OBJ_OP_DISCARD) {
1345		dout("%s %p objno %llu discard\n", __func__, obj_req,
1346		     obj_req->ex.oe_objno);
1347		return;
1348	}
1349
1350	if (!obj_req->num_img_extents) {
1351		dout("%s %p objno %llu not overlapping\n", __func__, obj_req,
1352		     obj_req->ex.oe_objno);
1353		return;
1354	}
1355
1356	if (rbd_obj_is_entire(obj_req) &&
1357	    !obj_req->img_request->snapc->num_snaps) {
1358		dout("%s %p objno %llu entire\n", __func__, obj_req,
1359		     obj_req->ex.oe_objno);
1360		return;
1361	}
1362
1363	obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
1364}
1365
1366static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1367{
1368	return ceph_file_extents_bytes(obj_req->img_extents,
1369				       obj_req->num_img_extents);
1370}
1371
1372static bool rbd_img_is_write(struct rbd_img_request *img_req)
1373{
1374	switch (img_req->op_type) {
1375	case OBJ_OP_READ:
1376		return false;
1377	case OBJ_OP_WRITE:
1378	case OBJ_OP_DISCARD:
1379	case OBJ_OP_ZEROOUT:
1380		return true;
1381	default:
1382		BUG();
1383	}
1384}
1385
1386static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1387{
1388	struct rbd_obj_request *obj_req = osd_req->r_priv;
1389	int result;
1390
1391	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1392	     osd_req->r_result, obj_req);
1393
1394	/*
1395	 * Writes aren't allowed to return a data payload.  In some
1396	 * guarded write cases (e.g. stat + zero on an empty object)
1397	 * a stat response makes it through, but we don't care.
1398	 */
1399	if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1400		result = 0;
1401	else
1402		result = osd_req->r_result;
1403
1404	rbd_obj_handle_request(obj_req, result);
1405}
1406
1407static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1408{
1409	struct rbd_obj_request *obj_request = osd_req->r_priv;
1410	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1411	struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1412
1413	osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1414	osd_req->r_snapid = obj_request->img_request->snap_id;
1415}
1416
1417static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1418{
1419	struct rbd_obj_request *obj_request = osd_req->r_priv;
1420
1421	osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1422	ktime_get_real_ts64(&osd_req->r_mtime);
1423	osd_req->r_data_offset = obj_request->ex.oe_off;
1424}
1425
1426static struct ceph_osd_request *
1427__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1428			  struct ceph_snap_context *snapc, int num_ops)
1429{
1430	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1431	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1432	struct ceph_osd_request *req;
1433	const char *name_format = rbd_dev->image_format == 1 ?
1434				      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1435	int ret;
1436
1437	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1438	if (!req)
1439		return ERR_PTR(-ENOMEM);
1440
1441	list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1442	req->r_callback = rbd_osd_req_callback;
1443	req->r_priv = obj_req;
1444
1445	/*
1446	 * Data objects may be stored in a separate pool, but always in
1447	 * the same namespace in that pool as the header in its pool.
1448	 */
1449	ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1450	req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1451
1452	ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1453			       rbd_dev->header.object_prefix,
1454			       obj_req->ex.oe_objno);
1455	if (ret)
1456		return ERR_PTR(ret);
1457
1458	return req;
1459}
1460
1461static struct ceph_osd_request *
1462rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1463{
1464	rbd_assert(obj_req->img_request->snapc);
1465	return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1466					 num_ops);
1467}
1468
1469static struct rbd_obj_request *rbd_obj_request_create(void)
1470{
1471	struct rbd_obj_request *obj_request;
1472
1473	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1474	if (!obj_request)
1475		return NULL;
1476
1477	ceph_object_extent_init(&obj_request->ex);
1478	INIT_LIST_HEAD(&obj_request->osd_reqs);
1479	mutex_init(&obj_request->state_mutex);
1480	kref_init(&obj_request->kref);
1481
1482	dout("%s %p\n", __func__, obj_request);
1483	return obj_request;
1484}
1485
1486static void rbd_obj_request_destroy(struct kref *kref)
1487{
1488	struct rbd_obj_request *obj_request;
1489	struct ceph_osd_request *osd_req;
1490	u32 i;
1491
1492	obj_request = container_of(kref, struct rbd_obj_request, kref);
1493
1494	dout("%s: obj %p\n", __func__, obj_request);
1495
1496	while (!list_empty(&obj_request->osd_reqs)) {
1497		osd_req = list_first_entry(&obj_request->osd_reqs,
1498				    struct ceph_osd_request, r_private_item);
1499		list_del_init(&osd_req->r_private_item);
1500		ceph_osdc_put_request(osd_req);
1501	}
1502
1503	switch (obj_request->img_request->data_type) {
1504	case OBJ_REQUEST_NODATA:
1505	case OBJ_REQUEST_BIO:
1506	case OBJ_REQUEST_BVECS:
1507		break;		/* Nothing to do */
1508	case OBJ_REQUEST_OWN_BVECS:
1509		kfree(obj_request->bvec_pos.bvecs);
1510		break;
1511	default:
1512		BUG();
1513	}
1514
1515	kfree(obj_request->img_extents);
1516	if (obj_request->copyup_bvecs) {
1517		for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1518			if (obj_request->copyup_bvecs[i].bv_page)
1519				__free_page(obj_request->copyup_bvecs[i].bv_page);
1520		}
1521		kfree(obj_request->copyup_bvecs);
1522	}
1523
1524	kmem_cache_free(rbd_obj_request_cache, obj_request);
1525}
1526
1527/* It's OK to call this for a device with no parent */
1528
1529static void rbd_spec_put(struct rbd_spec *spec);
1530static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1531{
1532	rbd_dev_remove_parent(rbd_dev);
1533	rbd_spec_put(rbd_dev->parent_spec);
1534	rbd_dev->parent_spec = NULL;
1535	rbd_dev->parent_overlap = 0;
1536}
1537
1538/*
1539 * Parent image reference counting is used to determine when an
1540 * image's parent fields can be safely torn down--after there are no
1541 * more in-flight requests to the parent image.  When the last
1542 * reference is dropped, cleaning them up is safe.
1543 */
1544static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1545{
1546	int counter;
1547
1548	if (!rbd_dev->parent_spec)
1549		return;
1550
1551	counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1552	if (counter > 0)
1553		return;
1554
1555	/* Last reference; clean up parent data structures */
1556
1557	if (!counter)
1558		rbd_dev_unparent(rbd_dev);
1559	else
1560		rbd_warn(rbd_dev, "parent reference underflow");
1561}
1562
1563/*
1564 * If an image has a non-zero parent overlap, get a reference to its
1565 * parent.
1566 *
1567 * Returns true if the rbd device has a parent with a non-zero
1568 * overlap and a reference for it was successfully taken, or
1569 * false otherwise.
1570 */
1571static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1572{
1573	int counter = 0;
1574
1575	if (!rbd_dev->parent_spec)
1576		return false;
1577
1578	if (rbd_dev->parent_overlap)
1579		counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1580
1581	if (counter < 0)
1582		rbd_warn(rbd_dev, "parent reference overflow");
1583
1584	return counter > 0;
1585}
1586
1587static void rbd_img_request_init(struct rbd_img_request *img_request,
1588				 struct rbd_device *rbd_dev,
1589				 enum obj_operation_type op_type)
1590{
1591	memset(img_request, 0, sizeof(*img_request));
1592
1593	img_request->rbd_dev = rbd_dev;
1594	img_request->op_type = op_type;
1595
1596	INIT_LIST_HEAD(&img_request->lock_item);
1597	INIT_LIST_HEAD(&img_request->object_extents);
1598	mutex_init(&img_request->state_mutex);
1599}
1600
1601/*
1602 * Only snap_id is captured here, for reads.  For writes, snapshot
1603 * context is captured in rbd_img_object_requests() after exclusive
1604 * lock is ensured to be held.
1605 */
1606static void rbd_img_capture_header(struct rbd_img_request *img_req)
1607{
1608	struct rbd_device *rbd_dev = img_req->rbd_dev;
1609
1610	lockdep_assert_held(&rbd_dev->header_rwsem);
1611
1612	if (!rbd_img_is_write(img_req))
1613		img_req->snap_id = rbd_dev->spec->snap_id;
1614
1615	if (rbd_dev_parent_get(rbd_dev))
1616		img_request_layered_set(img_req);
1617}
1618
1619static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1620{
1621	struct rbd_obj_request *obj_request;
1622	struct rbd_obj_request *next_obj_request;
1623
1624	dout("%s: img %p\n", __func__, img_request);
1625
1626	WARN_ON(!list_empty(&img_request->lock_item));
1627	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1628		rbd_img_obj_request_del(img_request, obj_request);
1629
1630	if (img_request_layered_test(img_request))
1631		rbd_dev_parent_put(img_request->rbd_dev);
1632
1633	if (rbd_img_is_write(img_request))
1634		ceph_put_snap_context(img_request->snapc);
1635
1636	if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1637		kmem_cache_free(rbd_img_request_cache, img_request);
1638}
1639
1640#define BITS_PER_OBJ	2
1641#define OBJS_PER_BYTE	(BITS_PER_BYTE / BITS_PER_OBJ)
1642#define OBJ_MASK	((1 << BITS_PER_OBJ) - 1)
1643
1644static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1645				   u64 *index, u8 *shift)
1646{
1647	u32 off;
1648
1649	rbd_assert(objno < rbd_dev->object_map_size);
1650	*index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1651	*shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1652}
1653
1654static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1655{
1656	u64 index;
1657	u8 shift;
1658
1659	lockdep_assert_held(&rbd_dev->object_map_lock);
1660	__rbd_object_map_index(rbd_dev, objno, &index, &shift);
1661	return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1662}
1663
1664static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1665{
1666	u64 index;
1667	u8 shift;
1668	u8 *p;
1669
1670	lockdep_assert_held(&rbd_dev->object_map_lock);
1671	rbd_assert(!(val & ~OBJ_MASK));
1672
1673	__rbd_object_map_index(rbd_dev, objno, &index, &shift);
1674	p = &rbd_dev->object_map[index];
1675	*p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1676}
1677
1678static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1679{
1680	u8 state;
1681
1682	spin_lock(&rbd_dev->object_map_lock);
1683	state = __rbd_object_map_get(rbd_dev, objno);
1684	spin_unlock(&rbd_dev->object_map_lock);
1685	return state;
1686}
1687
1688static bool use_object_map(struct rbd_device *rbd_dev)
1689{
1690	/*
1691	 * An image mapped read-only can't use the object map -- it isn't
1692	 * loaded because the header lock isn't acquired.  Someone else can
1693	 * write to the image and update the object map behind our back.
1694	 *
1695	 * A snapshot can't be written to, so using the object map is always
1696	 * safe.
1697	 */
1698	if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1699		return false;
1700
1701	return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1702		!(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1703}
1704
1705static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1706{
1707	u8 state;
1708
1709	/* fall back to default logic if object map is disabled or invalid */
1710	if (!use_object_map(rbd_dev))
1711		return true;
1712
1713	state = rbd_object_map_get(rbd_dev, objno);
1714	return state != OBJECT_NONEXISTENT;
1715}
1716
1717static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1718				struct ceph_object_id *oid)
1719{
1720	if (snap_id == CEPH_NOSNAP)
1721		ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1722				rbd_dev->spec->image_id);
1723	else
1724		ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1725				rbd_dev->spec->image_id, snap_id);
1726}
1727
1728static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1729{
1730	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1731	CEPH_DEFINE_OID_ONSTACK(oid);
1732	u8 lock_type;
1733	char *lock_tag;
1734	struct ceph_locker *lockers;
1735	u32 num_lockers;
1736	bool broke_lock = false;
1737	int ret;
1738
1739	rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1740
1741again:
1742	ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1743			    CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1744	if (ret != -EBUSY || broke_lock) {
1745		if (ret == -EEXIST)
1746			ret = 0; /* already locked by myself */
1747		if (ret)
1748			rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1749		return ret;
1750	}
1751
1752	ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1753				 RBD_LOCK_NAME, &lock_type, &lock_tag,
1754				 &lockers, &num_lockers);
1755	if (ret) {
1756		if (ret == -ENOENT)
1757			goto again;
1758
1759		rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1760		return ret;
1761	}
1762
1763	kfree(lock_tag);
1764	if (num_lockers == 0)
1765		goto again;
1766
1767	rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1768		 ENTITY_NAME(lockers[0].id.name));
1769
1770	ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1771				  RBD_LOCK_NAME, lockers[0].id.cookie,
1772				  &lockers[0].id.name);
1773	ceph_free_lockers(lockers, num_lockers);
1774	if (ret) {
1775		if (ret == -ENOENT)
1776			goto again;
1777
1778		rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1779		return ret;
1780	}
1781
1782	broke_lock = true;
1783	goto again;
1784}
1785
1786static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1787{
1788	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1789	CEPH_DEFINE_OID_ONSTACK(oid);
1790	int ret;
1791
1792	rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1793
1794	ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1795			      "");
1796	if (ret && ret != -ENOENT)
1797		rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1798}
1799
1800static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1801{
1802	u8 struct_v;
1803	u32 struct_len;
1804	u32 header_len;
1805	void *header_end;
1806	int ret;
1807
1808	ceph_decode_32_safe(p, end, header_len, e_inval);
1809	header_end = *p + header_len;
1810
1811	ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1812				  &struct_len);
1813	if (ret)
1814		return ret;
1815
1816	ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1817
1818	*p = header_end;
1819	return 0;
1820
1821e_inval:
1822	return -EINVAL;
1823}
1824
1825static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1826{
1827	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1828	CEPH_DEFINE_OID_ONSTACK(oid);
1829	struct page **pages;
1830	void *p, *end;
1831	size_t reply_len;
1832	u64 num_objects;
1833	u64 object_map_bytes;
1834	u64 object_map_size;
1835	int num_pages;
1836	int ret;
1837
1838	rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1839
1840	num_objects = ceph_get_num_objects(&rbd_dev->layout,
1841					   rbd_dev->mapping.size);
1842	object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1843					    BITS_PER_BYTE);
1844	num_pages = calc_pages_for(0, object_map_bytes) + 1;
1845	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1846	if (IS_ERR(pages))
1847		return PTR_ERR(pages);
1848
1849	reply_len = num_pages * PAGE_SIZE;
1850	rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1851	ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1852			     "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1853			     NULL, 0, pages, &reply_len);
1854	if (ret)
1855		goto out;
1856
1857	p = page_address(pages[0]);
1858	end = p + min(reply_len, (size_t)PAGE_SIZE);
1859	ret = decode_object_map_header(&p, end, &object_map_size);
1860	if (ret)
1861		goto out;
1862
1863	if (object_map_size != num_objects) {
1864		rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1865			 object_map_size, num_objects);
1866		ret = -EINVAL;
1867		goto out;
1868	}
1869
1870	if (offset_in_page(p) + object_map_bytes > reply_len) {
1871		ret = -EINVAL;
1872		goto out;
1873	}
1874
1875	rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1876	if (!rbd_dev->object_map) {
1877		ret = -ENOMEM;
1878		goto out;
1879	}
1880
1881	rbd_dev->object_map_size = object_map_size;
1882	ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1883				   offset_in_page(p), object_map_bytes);
1884
1885out:
1886	ceph_release_page_vector(pages, num_pages);
1887	return ret;
1888}
1889
1890static void rbd_object_map_free(struct rbd_device *rbd_dev)
1891{
1892	kvfree(rbd_dev->object_map);
1893	rbd_dev->object_map = NULL;
1894	rbd_dev->object_map_size = 0;
1895}
1896
1897static int rbd_object_map_load(struct rbd_device *rbd_dev)
1898{
1899	int ret;
1900
1901	ret = __rbd_object_map_load(rbd_dev);
1902	if (ret)
1903		return ret;
1904
1905	ret = rbd_dev_v2_get_flags(rbd_dev);
1906	if (ret) {
1907		rbd_object_map_free(rbd_dev);
1908		return ret;
1909	}
1910
1911	if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1912		rbd_warn(rbd_dev, "object map is invalid");
1913
1914	return 0;
1915}
1916
1917static int rbd_object_map_open(struct rbd_device *rbd_dev)
1918{
1919	int ret;
1920
1921	ret = rbd_object_map_lock(rbd_dev);
1922	if (ret)
1923		return ret;
1924
1925	ret = rbd_object_map_load(rbd_dev);
1926	if (ret) {
1927		rbd_object_map_unlock(rbd_dev);
1928		return ret;
1929	}
1930
1931	return 0;
1932}
1933
1934static void rbd_object_map_close(struct rbd_device *rbd_dev)
1935{
1936	rbd_object_map_free(rbd_dev);
1937	rbd_object_map_unlock(rbd_dev);
1938}
1939
1940/*
1941 * This function needs snap_id (or more precisely just something to
1942 * distinguish between HEAD and snapshot object maps), new_state and
1943 * current_state that were passed to rbd_object_map_update().
1944 *
1945 * To avoid allocating and stashing a context we piggyback on the OSD
1946 * request.  A HEAD update has two ops (assert_locked).  For new_state
1947 * and current_state we decode our own object_map_update op, encoded in
1948 * rbd_cls_object_map_update().
1949 */
1950static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1951					struct ceph_osd_request *osd_req)
1952{
1953	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1954	struct ceph_osd_data *osd_data;
1955	u64 objno;
1956	u8 state, new_state, current_state;
1957	bool has_current_state;
1958	void *p;
1959
1960	if (osd_req->r_result)
1961		return osd_req->r_result;
1962
1963	/*
1964	 * Nothing to do for a snapshot object map.
1965	 */
1966	if (osd_req->r_num_ops == 1)
1967		return 0;
1968
1969	/*
1970	 * Update in-memory HEAD object map.
1971	 */
1972	rbd_assert(osd_req->r_num_ops == 2);
1973	osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1974	rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1975
1976	p = page_address(osd_data->pages[0]);
1977	objno = ceph_decode_64(&p);
1978	rbd_assert(objno == obj_req->ex.oe_objno);
1979	rbd_assert(ceph_decode_64(&p) == objno + 1);
1980	new_state = ceph_decode_8(&p);
1981	has_current_state = ceph_decode_8(&p);
1982	if (has_current_state)
1983		current_state = ceph_decode_8(&p);
1984
1985	spin_lock(&rbd_dev->object_map_lock);
1986	state = __rbd_object_map_get(rbd_dev, objno);
1987	if (!has_current_state || current_state == state ||
1988	    (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1989		__rbd_object_map_set(rbd_dev, objno, new_state);
1990	spin_unlock(&rbd_dev->object_map_lock);
1991
1992	return 0;
1993}
1994
1995static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1996{
1997	struct rbd_obj_request *obj_req = osd_req->r_priv;
1998	int result;
1999
2000	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2001	     osd_req->r_result, obj_req);
2002
2003	result = rbd_object_map_update_finish(obj_req, osd_req);
2004	rbd_obj_handle_request(obj_req, result);
2005}
2006
2007static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2008{
2009	u8 state = rbd_object_map_get(rbd_dev, objno);
2010
2011	if (state == new_state ||
2012	    (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2013	    (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2014		return false;
2015
2016	return true;
2017}
2018
2019static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2020				     int which, u64 objno, u8 new_state,
2021				     const u8 *current_state)
2022{
2023	struct page **pages;
2024	void *p, *start;
2025	int ret;
2026
2027	ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2028	if (ret)
2029		return ret;
2030
2031	pages = ceph_alloc_page_vector(1, GFP_NOIO);
2032	if (IS_ERR(pages))
2033		return PTR_ERR(pages);
2034
2035	p = start = page_address(pages[0]);
2036	ceph_encode_64(&p, objno);
2037	ceph_encode_64(&p, objno + 1);
2038	ceph_encode_8(&p, new_state);
2039	if (current_state) {
2040		ceph_encode_8(&p, 1);
2041		ceph_encode_8(&p, *current_state);
2042	} else {
2043		ceph_encode_8(&p, 0);
2044	}
2045
2046	osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2047					  false, true);
2048	return 0;
2049}
2050
2051/*
2052 * Return:
2053 *   0 - object map update sent
2054 *   1 - object map update isn't needed
2055 *  <0 - error
2056 */
2057static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2058				 u8 new_state, const u8 *current_state)
2059{
2060	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2061	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2062	struct ceph_osd_request *req;
2063	int num_ops = 1;
2064	int which = 0;
2065	int ret;
2066
2067	if (snap_id == CEPH_NOSNAP) {
2068		if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2069			return 1;
2070
2071		num_ops++; /* assert_locked */
2072	}
2073
2074	req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2075	if (!req)
2076		return -ENOMEM;
2077
2078	list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2079	req->r_callback = rbd_object_map_callback;
2080	req->r_priv = obj_req;
2081
2082	rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2083	ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2084	req->r_flags = CEPH_OSD_FLAG_WRITE;
2085	ktime_get_real_ts64(&req->r_mtime);
2086
2087	if (snap_id == CEPH_NOSNAP) {
2088		/*
2089		 * Protect against possible race conditions during lock
2090		 * ownership transitions.
2091		 */
2092		ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2093					     CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2094		if (ret)
2095			return ret;
2096	}
2097
2098	ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2099					new_state, current_state);
2100	if (ret)
2101		return ret;
2102
2103	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2104	if (ret)
2105		return ret;
2106
2107	ceph_osdc_start_request(osdc, req);
2108	return 0;
2109}
2110
2111static void prune_extents(struct ceph_file_extent *img_extents,
2112			  u32 *num_img_extents, u64 overlap)
2113{
2114	u32 cnt = *num_img_extents;
2115
2116	/* drop extents completely beyond the overlap */
2117	while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2118		cnt--;
2119
2120	if (cnt) {
2121		struct ceph_file_extent *ex = &img_extents[cnt - 1];
2122
2123		/* trim final overlapping extent */
2124		if (ex->fe_off + ex->fe_len > overlap)
2125			ex->fe_len = overlap - ex->fe_off;
2126	}
2127
2128	*num_img_extents = cnt;
2129}
2130
2131/*
2132 * Determine the byte range(s) covered by either just the object extent
2133 * or the entire object in the parent image.
2134 */
2135static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2136				    bool entire)
2137{
2138	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2139	int ret;
2140
2141	if (!rbd_dev->parent_overlap)
2142		return 0;
2143
2144	ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2145				  entire ? 0 : obj_req->ex.oe_off,
2146				  entire ? rbd_dev->layout.object_size :
2147							obj_req->ex.oe_len,
2148				  &obj_req->img_extents,
2149				  &obj_req->num_img_extents);
2150	if (ret)
2151		return ret;
2152
2153	prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2154		      rbd_dev->parent_overlap);
2155	return 0;
2156}
2157
2158static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2159{
2160	struct rbd_obj_request *obj_req = osd_req->r_priv;
2161
2162	switch (obj_req->img_request->data_type) {
2163	case OBJ_REQUEST_BIO:
2164		osd_req_op_extent_osd_data_bio(osd_req, which,
2165					       &obj_req->bio_pos,
2166					       obj_req->ex.oe_len);
2167		break;
2168	case OBJ_REQUEST_BVECS:
2169	case OBJ_REQUEST_OWN_BVECS:
2170		rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2171							obj_req->ex.oe_len);
2172		rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2173		osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2174						    &obj_req->bvec_pos);
2175		break;
2176	default:
2177		BUG();
2178	}
2179}
2180
2181static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2182{
2183	struct page **pages;
2184
2185	/*
2186	 * The response data for a STAT call consists of:
2187	 *     le64 length;
2188	 *     struct {
2189	 *         le32 tv_sec;
2190	 *         le32 tv_nsec;
2191	 *     } mtime;
2192	 */
2193	pages = ceph_alloc_page_vector(1, GFP_NOIO);
2194	if (IS_ERR(pages))
2195		return PTR_ERR(pages);
2196
2197	osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2198	osd_req_op_raw_data_in_pages(osd_req, which, pages,
2199				     8 + sizeof(struct ceph_timespec),
2200				     0, false, true);
2201	return 0;
2202}
2203
2204static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2205				u32 bytes)
2206{
2207	struct rbd_obj_request *obj_req = osd_req->r_priv;
2208	int ret;
2209
2210	ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2211	if (ret)
2212		return ret;
2213
2214	osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2215					  obj_req->copyup_bvec_count, bytes);
2216	return 0;
2217}
2218
2219static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2220{
2221	obj_req->read_state = RBD_OBJ_READ_START;
2222	return 0;
2223}
2224
2225static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2226				      int which)
2227{
2228	struct rbd_obj_request *obj_req = osd_req->r_priv;
2229	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2230	u16 opcode;
2231
2232	if (!use_object_map(rbd_dev) ||
2233	    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2234		osd_req_op_alloc_hint_init(osd_req, which++,
2235					   rbd_dev->layout.object_size,
2236					   rbd_dev->layout.object_size,
2237					   rbd_dev->opts->alloc_hint_flags);
2238	}
2239
2240	if (rbd_obj_is_entire(obj_req))
2241		opcode = CEPH_OSD_OP_WRITEFULL;
2242	else
2243		opcode = CEPH_OSD_OP_WRITE;
2244
2245	osd_req_op_extent_init(osd_req, which, opcode,
2246			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2247	rbd_osd_setup_data(osd_req, which);
2248}
2249
2250static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2251{
2252	int ret;
2253
2254	/* reverse map the entire object onto the parent */
2255	ret = rbd_obj_calc_img_extents(obj_req, true);
2256	if (ret)
2257		return ret;
2258
2259	obj_req->write_state = RBD_OBJ_WRITE_START;
2260	return 0;
2261}
2262
2263static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2264{
2265	return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2266					  CEPH_OSD_OP_ZERO;
2267}
2268
2269static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2270					int which)
2271{
2272	struct rbd_obj_request *obj_req = osd_req->r_priv;
2273
2274	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2275		rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2276		osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2277	} else {
2278		osd_req_op_extent_init(osd_req, which,
2279				       truncate_or_zero_opcode(obj_req),
2280				       obj_req->ex.oe_off, obj_req->ex.oe_len,
2281				       0, 0);
2282	}
2283}
2284
2285static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2286{
2287	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2288	u64 off, next_off;
2289	int ret;
2290
2291	/*
2292	 * Align the range to alloc_size boundary and punt on discards
2293	 * that are too small to free up any space.
2294	 *
2295	 * alloc_size == object_size && is_tail() is a special case for
2296	 * filestore with filestore_punch_hole = false, needed to allow
2297	 * truncate (in addition to delete).
2298	 */
2299	if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2300	    !rbd_obj_is_tail(obj_req)) {
2301		off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2302		next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2303				      rbd_dev->opts->alloc_size);
2304		if (off >= next_off)
2305			return 1;
2306
2307		dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2308		     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2309		     off, next_off - off);
2310		obj_req->ex.oe_off = off;
2311		obj_req->ex.oe_len = next_off - off;
2312	}
2313
2314	/* reverse map the entire object onto the parent */
2315	ret = rbd_obj_calc_img_extents(obj_req, true);
2316	if (ret)
2317		return ret;
2318
2319	obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2320	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2321		obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2322
2323	obj_req->write_state = RBD_OBJ_WRITE_START;
2324	return 0;
2325}
2326
2327static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2328					int which)
2329{
2330	struct rbd_obj_request *obj_req = osd_req->r_priv;
2331	u16 opcode;
2332
2333	if (rbd_obj_is_entire(obj_req)) {
2334		if (obj_req->num_img_extents) {
2335			if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2336				osd_req_op_init(osd_req, which++,
2337						CEPH_OSD_OP_CREATE, 0);
2338			opcode = CEPH_OSD_OP_TRUNCATE;
2339		} else {
2340			rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2341			osd_req_op_init(osd_req, which++,
2342					CEPH_OSD_OP_DELETE, 0);
2343			opcode = 0;
2344		}
2345	} else {
2346		opcode = truncate_or_zero_opcode(obj_req);
2347	}
2348
2349	if (opcode)
2350		osd_req_op_extent_init(osd_req, which, opcode,
2351				       obj_req->ex.oe_off, obj_req->ex.oe_len,
2352				       0, 0);
2353}
2354
2355static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2356{
2357	int ret;
2358
2359	/* reverse map the entire object onto the parent */
2360	ret = rbd_obj_calc_img_extents(obj_req, true);
2361	if (ret)
2362		return ret;
2363
2364	if (!obj_req->num_img_extents) {
2365		obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2366		if (rbd_obj_is_entire(obj_req))
2367			obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2368	}
2369
2370	obj_req->write_state = RBD_OBJ_WRITE_START;
2371	return 0;
2372}
2373
2374static int count_write_ops(struct rbd_obj_request *obj_req)
2375{
2376	struct rbd_img_request *img_req = obj_req->img_request;
2377
2378	switch (img_req->op_type) {
2379	case OBJ_OP_WRITE:
2380		if (!use_object_map(img_req->rbd_dev) ||
2381		    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2382			return 2; /* setallochint + write/writefull */
2383
2384		return 1; /* write/writefull */
2385	case OBJ_OP_DISCARD:
2386		return 1; /* delete/truncate/zero */
2387	case OBJ_OP_ZEROOUT:
2388		if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2389		    !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2390			return 2; /* create + truncate */
2391
2392		return 1; /* delete/truncate/zero */
2393	default:
2394		BUG();
2395	}
2396}
2397
2398static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2399				    int which)
2400{
2401	struct rbd_obj_request *obj_req = osd_req->r_priv;
2402
2403	switch (obj_req->img_request->op_type) {
2404	case OBJ_OP_WRITE:
2405		__rbd_osd_setup_write_ops(osd_req, which);
2406		break;
2407	case OBJ_OP_DISCARD:
2408		__rbd_osd_setup_discard_ops(osd_req, which);
2409		break;
2410	case OBJ_OP_ZEROOUT:
2411		__rbd_osd_setup_zeroout_ops(osd_req, which);
2412		break;
2413	default:
2414		BUG();
2415	}
2416}
2417
2418/*
2419 * Prune the list of object requests (adjust offset and/or length, drop
2420 * redundant requests).  Prepare object request state machines and image
2421 * request state machine for execution.
2422 */
2423static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2424{
2425	struct rbd_obj_request *obj_req, *next_obj_req;
2426	int ret;
2427
2428	for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2429		switch (img_req->op_type) {
2430		case OBJ_OP_READ:
2431			ret = rbd_obj_init_read(obj_req);
2432			break;
2433		case OBJ_OP_WRITE:
2434			ret = rbd_obj_init_write(obj_req);
2435			break;
2436		case OBJ_OP_DISCARD:
2437			ret = rbd_obj_init_discard(obj_req);
2438			break;
2439		case OBJ_OP_ZEROOUT:
2440			ret = rbd_obj_init_zeroout(obj_req);
2441			break;
2442		default:
2443			BUG();
2444		}
2445		if (ret < 0)
2446			return ret;
2447		if (ret > 0) {
2448			rbd_img_obj_request_del(img_req, obj_req);
2449			continue;
2450		}
2451	}
2452
2453	img_req->state = RBD_IMG_START;
2454	return 0;
2455}
2456
2457union rbd_img_fill_iter {
2458	struct ceph_bio_iter	bio_iter;
2459	struct ceph_bvec_iter	bvec_iter;
2460};
2461
2462struct rbd_img_fill_ctx {
2463	enum obj_request_type	pos_type;
2464	union rbd_img_fill_iter	*pos;
2465	union rbd_img_fill_iter	iter;
2466	ceph_object_extent_fn_t	set_pos_fn;
2467	ceph_object_extent_fn_t	count_fn;
2468	ceph_object_extent_fn_t	copy_fn;
2469};
2470
2471static struct ceph_object_extent *alloc_object_extent(void *arg)
2472{
2473	struct rbd_img_request *img_req = arg;
2474	struct rbd_obj_request *obj_req;
2475
2476	obj_req = rbd_obj_request_create();
2477	if (!obj_req)
2478		return NULL;
2479
2480	rbd_img_obj_request_add(img_req, obj_req);
2481	return &obj_req->ex;
2482}
2483
2484/*
2485 * While su != os && sc == 1 is technically not fancy (it's the same
2486 * layout as su == os && sc == 1), we can't use the nocopy path for it
2487 * because ->set_pos_fn() should be called only once per object.
2488 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2489 * treat su != os && sc == 1 as fancy.
2490 */
2491static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2492{
2493	return l->stripe_unit != l->object_size;
2494}
2495
2496static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2497				       struct ceph_file_extent *img_extents,
2498				       u32 num_img_extents,
2499				       struct rbd_img_fill_ctx *fctx)
2500{
2501	u32 i;
2502	int ret;
2503
2504	img_req->data_type = fctx->pos_type;
2505
2506	/*
2507	 * Create object requests and set each object request's starting
2508	 * position in the provided bio (list) or bio_vec array.
2509	 */
2510	fctx->iter = *fctx->pos;
2511	for (i = 0; i < num_img_extents; i++) {
2512		ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2513					   img_extents[i].fe_off,
2514					   img_extents[i].fe_len,
2515					   &img_req->object_extents,
2516					   alloc_object_extent, img_req,
2517					   fctx->set_pos_fn, &fctx->iter);
2518		if (ret)
2519			return ret;
2520	}
2521
2522	return __rbd_img_fill_request(img_req);
2523}
2524
2525/*
2526 * Map a list of image extents to a list of object extents, create the
2527 * corresponding object requests (normally each to a different object,
2528 * but not always) and add them to @img_req.  For each object request,
2529 * set up its data descriptor to point to the corresponding chunk(s) of
2530 * @fctx->pos data buffer.
2531 *
2532 * Because ceph_file_to_extents() will merge adjacent object extents
2533 * together, each object request's data descriptor may point to multiple
2534 * different chunks of @fctx->pos data buffer.
2535 *
2536 * @fctx->pos data buffer is assumed to be large enough.
2537 */
2538static int rbd_img_fill_request(struct rbd_img_request *img_req,
2539				struct ceph_file_extent *img_extents,
2540				u32 num_img_extents,
2541				struct rbd_img_fill_ctx *fctx)
2542{
2543	struct rbd_device *rbd_dev = img_req->rbd_dev;
2544	struct rbd_obj_request *obj_req;
2545	u32 i;
2546	int ret;
2547
2548	if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2549	    !rbd_layout_is_fancy(&rbd_dev->layout))
2550		return rbd_img_fill_request_nocopy(img_req, img_extents,
2551						   num_img_extents, fctx);
2552
2553	img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2554
2555	/*
2556	 * Create object requests and determine ->bvec_count for each object
2557	 * request.  Note that ->bvec_count sum over all object requests may
2558	 * be greater than the number of bio_vecs in the provided bio (list)
2559	 * or bio_vec array because when mapped, those bio_vecs can straddle
2560	 * stripe unit boundaries.
2561	 */
2562	fctx->iter = *fctx->pos;
2563	for (i = 0; i < num_img_extents; i++) {
2564		ret = ceph_file_to_extents(&rbd_dev->layout,
2565					   img_extents[i].fe_off,
2566					   img_extents[i].fe_len,
2567					   &img_req->object_extents,
2568					   alloc_object_extent, img_req,
2569					   fctx->count_fn, &fctx->iter);
2570		if (ret)
2571			return ret;
2572	}
2573
2574	for_each_obj_request(img_req, obj_req) {
2575		obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2576					      sizeof(*obj_req->bvec_pos.bvecs),
2577					      GFP_NOIO);
2578		if (!obj_req->bvec_pos.bvecs)
2579			return -ENOMEM;
2580	}
2581
2582	/*
2583	 * Fill in each object request's private bio_vec array, splitting and
2584	 * rearranging the provided bio_vecs in stripe unit chunks as needed.
2585	 */
2586	fctx->iter = *fctx->pos;
2587	for (i = 0; i < num_img_extents; i++) {
2588		ret = ceph_iterate_extents(&rbd_dev->layout,
2589					   img_extents[i].fe_off,
2590					   img_extents[i].fe_len,
2591					   &img_req->object_extents,
2592					   fctx->copy_fn, &fctx->iter);
2593		if (ret)
2594			return ret;
2595	}
2596
2597	return __rbd_img_fill_request(img_req);
2598}
2599
2600static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2601			       u64 off, u64 len)
2602{
2603	struct ceph_file_extent ex = { off, len };
2604	union rbd_img_fill_iter dummy = {};
2605	struct rbd_img_fill_ctx fctx = {
2606		.pos_type = OBJ_REQUEST_NODATA,
2607		.pos = &dummy,
2608	};
2609
2610	return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2611}
2612
2613static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2614{
2615	struct rbd_obj_request *obj_req =
2616	    container_of(ex, struct rbd_obj_request, ex);
2617	struct ceph_bio_iter *it = arg;
2618
2619	dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2620	obj_req->bio_pos = *it;
2621	ceph_bio_iter_advance(it, bytes);
2622}
2623
2624static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2625{
2626	struct rbd_obj_request *obj_req =
2627	    container_of(ex, struct rbd_obj_request, ex);
2628	struct ceph_bio_iter *it = arg;
2629
2630	dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2631	ceph_bio_iter_advance_step(it, bytes, ({
2632		obj_req->bvec_count++;
2633	}));
2634
2635}
2636
2637static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2638{
2639	struct rbd_obj_request *obj_req =
2640	    container_of(ex, struct rbd_obj_request, ex);
2641	struct ceph_bio_iter *it = arg;
2642
2643	dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2644	ceph_bio_iter_advance_step(it, bytes, ({
2645		obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2646		obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2647	}));
2648}
2649
2650static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2651				   struct ceph_file_extent *img_extents,
2652				   u32 num_img_extents,
2653				   struct ceph_bio_iter *bio_pos)
2654{
2655	struct rbd_img_fill_ctx fctx = {
2656		.pos_type = OBJ_REQUEST_BIO,
2657		.pos = (union rbd_img_fill_iter *)bio_pos,
2658		.set_pos_fn = set_bio_pos,
2659		.count_fn = count_bio_bvecs,
2660		.copy_fn = copy_bio_bvecs,
2661	};
2662
2663	return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2664				    &fctx);
2665}
2666
2667static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2668				 u64 off, u64 len, struct bio *bio)
2669{
2670	struct ceph_file_extent ex = { off, len };
2671	struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2672
2673	return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2674}
2675
2676static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2677{
2678	struct rbd_obj_request *obj_req =
2679	    container_of(ex, struct rbd_obj_request, ex);
2680	struct ceph_bvec_iter *it = arg;
2681
2682	obj_req->bvec_pos = *it;
2683	ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2684	ceph_bvec_iter_advance(it, bytes);
2685}
2686
2687static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2688{
2689	struct rbd_obj_request *obj_req =
2690	    container_of(ex, struct rbd_obj_request, ex);
2691	struct ceph_bvec_iter *it = arg;
2692
2693	ceph_bvec_iter_advance_step(it, bytes, ({
2694		obj_req->bvec_count++;
2695	}));
2696}
2697
2698static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2699{
2700	struct rbd_obj_request *obj_req =
2701	    container_of(ex, struct rbd_obj_request, ex);
2702	struct ceph_bvec_iter *it = arg;
2703
2704	ceph_bvec_iter_advance_step(it, bytes, ({
2705		obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2706		obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2707	}));
2708}
2709
2710static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2711				     struct ceph_file_extent *img_extents,
2712				     u32 num_img_extents,
2713				     struct ceph_bvec_iter *bvec_pos)
2714{
2715	struct rbd_img_fill_ctx fctx = {
2716		.pos_type = OBJ_REQUEST_BVECS,
2717		.pos = (union rbd_img_fill_iter *)bvec_pos,
2718		.set_pos_fn = set_bvec_pos,
2719		.count_fn = count_bvecs,
2720		.copy_fn = copy_bvecs,
2721	};
2722
2723	return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2724				    &fctx);
2725}
2726
2727static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2728				   struct ceph_file_extent *img_extents,
2729				   u32 num_img_extents,
2730				   struct bio_vec *bvecs)
2731{
2732	struct ceph_bvec_iter it = {
2733		.bvecs = bvecs,
2734		.iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2735							     num_img_extents) },
2736	};
2737
2738	return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2739					 &it);
2740}
2741
2742static void rbd_img_handle_request_work(struct work_struct *work)
2743{
2744	struct rbd_img_request *img_req =
2745	    container_of(work, struct rbd_img_request, work);
2746
2747	rbd_img_handle_request(img_req, img_req->work_result);
2748}
2749
2750static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2751{
2752	INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2753	img_req->work_result = result;
2754	queue_work(rbd_wq, &img_req->work);
2755}
2756
2757static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2758{
2759	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2760
2761	if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2762		obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2763		return true;
2764	}
2765
2766	dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2767	     obj_req->ex.oe_objno);
2768	return false;
2769}
2770
2771static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2772{
2773	struct ceph_osd_request *osd_req;
2774	int ret;
2775
2776	osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2777	if (IS_ERR(osd_req))
2778		return PTR_ERR(osd_req);
2779
2780	osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2781			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2782	rbd_osd_setup_data(osd_req, 0);
2783	rbd_osd_format_read(osd_req);
2784
2785	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2786	if (ret)
2787		return ret;
2788
2789	rbd_osd_submit(osd_req);
2790	return 0;
2791}
2792
2793static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2794{
2795	struct rbd_img_request *img_req = obj_req->img_request;
2796	struct rbd_device *parent = img_req->rbd_dev->parent;
2797	struct rbd_img_request *child_img_req;
2798	int ret;
2799
2800	child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2801	if (!child_img_req)
2802		return -ENOMEM;
2803
2804	rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2805	__set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2806	child_img_req->obj_request = obj_req;
2807
2808	down_read(&parent->header_rwsem);
2809	rbd_img_capture_header(child_img_req);
2810	up_read(&parent->header_rwsem);
2811
2812	dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2813	     obj_req);
2814
2815	if (!rbd_img_is_write(img_req)) {
2816		switch (img_req->data_type) {
2817		case OBJ_REQUEST_BIO:
2818			ret = __rbd_img_fill_from_bio(child_img_req,
2819						      obj_req->img_extents,
2820						      obj_req->num_img_extents,
2821						      &obj_req->bio_pos);
2822			break;
2823		case OBJ_REQUEST_BVECS:
2824		case OBJ_REQUEST_OWN_BVECS:
2825			ret = __rbd_img_fill_from_bvecs(child_img_req,
2826						      obj_req->img_extents,
2827						      obj_req->num_img_extents,
2828						      &obj_req->bvec_pos);
2829			break;
2830		default:
2831			BUG();
2832		}
2833	} else {
2834		ret = rbd_img_fill_from_bvecs(child_img_req,
2835					      obj_req->img_extents,
2836					      obj_req->num_img_extents,
2837					      obj_req->copyup_bvecs);
2838	}
2839	if (ret) {
2840		rbd_img_request_destroy(child_img_req);
2841		return ret;
2842	}
2843
2844	/* avoid parent chain recursion */
2845	rbd_img_schedule(child_img_req, 0);
2846	return 0;
2847}
2848
2849static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2850{
2851	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2852	int ret;
2853
2854again:
2855	switch (obj_req->read_state) {
2856	case RBD_OBJ_READ_START:
2857		rbd_assert(!*result);
2858
2859		if (!rbd_obj_may_exist(obj_req)) {
2860			*result = -ENOENT;
2861			obj_req->read_state = RBD_OBJ_READ_OBJECT;
2862			goto again;
2863		}
2864
2865		ret = rbd_obj_read_object(obj_req);
2866		if (ret) {
2867			*result = ret;
2868			return true;
2869		}
2870		obj_req->read_state = RBD_OBJ_READ_OBJECT;
2871		return false;
2872	case RBD_OBJ_READ_OBJECT:
2873		if (*result == -ENOENT && rbd_dev->parent_overlap) {
2874			/* reverse map this object extent onto the parent */
2875			ret = rbd_obj_calc_img_extents(obj_req, false);
2876			if (ret) {
2877				*result = ret;
2878				return true;
2879			}
2880			if (obj_req->num_img_extents) {
2881				ret = rbd_obj_read_from_parent(obj_req);
2882				if (ret) {
2883					*result = ret;
2884					return true;
2885				}
2886				obj_req->read_state = RBD_OBJ_READ_PARENT;
2887				return false;
2888			}
2889		}
2890
2891		/*
2892		 * -ENOENT means a hole in the image -- zero-fill the entire
2893		 * length of the request.  A short read also implies zero-fill
2894		 * to the end of the request.
2895		 */
2896		if (*result == -ENOENT) {
2897			rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2898			*result = 0;
2899		} else if (*result >= 0) {
2900			if (*result < obj_req->ex.oe_len)
2901				rbd_obj_zero_range(obj_req, *result,
2902						obj_req->ex.oe_len - *result);
2903			else
2904				rbd_assert(*result == obj_req->ex.oe_len);
2905			*result = 0;
2906		}
2907		return true;
2908	case RBD_OBJ_READ_PARENT:
2909		/*
2910		 * The parent image is read only up to the overlap -- zero-fill
2911		 * from the overlap to the end of the request.
2912		 */
2913		if (!*result) {
2914			u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2915
2916			if (obj_overlap < obj_req->ex.oe_len)
2917				rbd_obj_zero_range(obj_req, obj_overlap,
2918					    obj_req->ex.oe_len - obj_overlap);
2919		}
2920		return true;
2921	default:
2922		BUG();
2923	}
2924}
2925
2926static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2927{
2928	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2929
2930	if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2931		obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2932
2933	if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2934	    (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2935		dout("%s %p noop for nonexistent\n", __func__, obj_req);
2936		return true;
2937	}
2938
2939	return false;
2940}
2941
2942/*
2943 * Return:
2944 *   0 - object map update sent
2945 *   1 - object map update isn't needed
2946 *  <0 - error
2947 */
2948static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2949{
2950	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2951	u8 new_state;
2952
2953	if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2954		return 1;
2955
2956	if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2957		new_state = OBJECT_PENDING;
2958	else
2959		new_state = OBJECT_EXISTS;
2960
2961	return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2962}
2963
2964static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2965{
2966	struct ceph_osd_request *osd_req;
2967	int num_ops = count_write_ops(obj_req);
2968	int which = 0;
2969	int ret;
2970
2971	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2972		num_ops++; /* stat */
2973
2974	osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2975	if (IS_ERR(osd_req))
2976		return PTR_ERR(osd_req);
2977
2978	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2979		ret = rbd_osd_setup_stat(osd_req, which++);
2980		if (ret)
2981			return ret;
2982	}
2983
2984	rbd_osd_setup_write_ops(osd_req, which);
2985	rbd_osd_format_write(osd_req);
2986
2987	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2988	if (ret)
2989		return ret;
2990
2991	rbd_osd_submit(osd_req);
2992	return 0;
2993}
2994
2995/*
2996 * copyup_bvecs pages are never highmem pages
2997 */
2998static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2999{
3000	struct ceph_bvec_iter it = {
3001		.bvecs = bvecs,
3002		.iter = { .bi_size = bytes },
3003	};
3004
3005	ceph_bvec_iter_advance_step(&it, bytes, ({
3006		if (memchr_inv(bvec_virt(&bv), 0, bv.bv_len))
3007			return false;
3008	}));
3009	return true;
3010}
3011
3012#define MODS_ONLY	U32_MAX
3013
3014static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3015				      u32 bytes)
3016{
3017	struct ceph_osd_request *osd_req;
3018	int ret;
3019
3020	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3021	rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3022
3023	osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3024	if (IS_ERR(osd_req))
3025		return PTR_ERR(osd_req);
3026
3027	ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3028	if (ret)
3029		return ret;
3030
3031	rbd_osd_format_write(osd_req);
3032
3033	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3034	if (ret)
3035		return ret;
3036
3037	rbd_osd_submit(osd_req);
3038	return 0;
3039}
3040
3041static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3042					u32 bytes)
3043{
3044	struct ceph_osd_request *osd_req;
3045	int num_ops = count_write_ops(obj_req);
3046	int which = 0;
3047	int ret;
3048
3049	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3050
3051	if (bytes != MODS_ONLY)
3052		num_ops++; /* copyup */
3053
3054	osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3055	if (IS_ERR(osd_req))
3056		return PTR_ERR(osd_req);
3057
3058	if (bytes != MODS_ONLY) {
3059		ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3060		if (ret)
3061			return ret;
3062	}
3063
3064	rbd_osd_setup_write_ops(osd_req, which);
3065	rbd_osd_format_write(osd_req);
3066
3067	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3068	if (ret)
3069		return ret;
3070
3071	rbd_osd_submit(osd_req);
3072	return 0;
3073}
3074
3075static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3076{
3077	u32 i;
3078
3079	rbd_assert(!obj_req->copyup_bvecs);
3080	obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3081	obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3082					sizeof(*obj_req->copyup_bvecs),
3083					GFP_NOIO);
3084	if (!obj_req->copyup_bvecs)
3085		return -ENOMEM;
3086
3087	for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3088		unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3089		struct page *page = alloc_page(GFP_NOIO);
3090
3091		if (!page)
3092			return -ENOMEM;
3093
3094		bvec_set_page(&obj_req->copyup_bvecs[i], page, len, 0);
3095		obj_overlap -= len;
3096	}
3097
3098	rbd_assert(!obj_overlap);
3099	return 0;
3100}
3101
3102/*
3103 * The target object doesn't exist.  Read the data for the entire
3104 * target object up to the overlap point (if any) from the parent,
3105 * so we can use it for a copyup.
3106 */
3107static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3108{
3109	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3110	int ret;
3111
3112	rbd_assert(obj_req->num_img_extents);
3113	prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3114		      rbd_dev->parent_overlap);
3115	if (!obj_req->num_img_extents) {
3116		/*
3117		 * The overlap has become 0 (most likely because the
3118		 * image has been flattened).  Re-submit the original write
3119		 * request -- pass MODS_ONLY since the copyup isn't needed
3120		 * anymore.
3121		 */
3122		return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3123	}
3124
3125	ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3126	if (ret)
3127		return ret;
3128
3129	return rbd_obj_read_from_parent(obj_req);
3130}
3131
3132static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3133{
3134	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3135	struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3136	u8 new_state;
3137	u32 i;
3138	int ret;
3139
3140	rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3141
3142	if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3143		return;
3144
3145	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3146		return;
3147
3148	for (i = 0; i < snapc->num_snaps; i++) {
3149		if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3150		    i + 1 < snapc->num_snaps)
3151			new_state = OBJECT_EXISTS_CLEAN;
3152		else
3153			new_state = OBJECT_EXISTS;
3154
3155		ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3156					    new_state, NULL);
3157		if (ret < 0) {
3158			obj_req->pending.result = ret;
3159			return;
3160		}
3161
3162		rbd_assert(!ret);
3163		obj_req->pending.num_pending++;
3164	}
3165}
3166
3167static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3168{
3169	u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3170	int ret;
3171
3172	rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3173
3174	/*
3175	 * Only send non-zero copyup data to save some I/O and network
3176	 * bandwidth -- zero copyup data is equivalent to the object not
3177	 * existing.
3178	 */
3179	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3180		bytes = 0;
3181
3182	if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3183		/*
3184		 * Send a copyup request with an empty snapshot context to
3185		 * deep-copyup the object through all existing snapshots.
3186		 * A second request with the current snapshot context will be
3187		 * sent for the actual modification.
3188		 */
3189		ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3190		if (ret) {
3191			obj_req->pending.result = ret;
3192			return;
3193		}
3194
3195		obj_req->pending.num_pending++;
3196		bytes = MODS_ONLY;
3197	}
3198
3199	ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3200	if (ret) {
3201		obj_req->pending.result = ret;
3202		return;
3203	}
3204
3205	obj_req->pending.num_pending++;
3206}
3207
3208static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3209{
3210	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3211	int ret;
3212
3213again:
3214	switch (obj_req->copyup_state) {
3215	case RBD_OBJ_COPYUP_START:
3216		rbd_assert(!*result);
3217
3218		ret = rbd_obj_copyup_read_parent(obj_req);
3219		if (ret) {
3220			*result = ret;
3221			return true;
3222		}
3223		if (obj_req->num_img_extents)
3224			obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3225		else
3226			obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3227		return false;
3228	case RBD_OBJ_COPYUP_READ_PARENT:
3229		if (*result)
3230			return true;
3231
3232		if (is_zero_bvecs(obj_req->copyup_bvecs,
3233				  rbd_obj_img_extents_bytes(obj_req))) {
3234			dout("%s %p detected zeros\n", __func__, obj_req);
3235			obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3236		}
3237
3238		rbd_obj_copyup_object_maps(obj_req);
3239		if (!obj_req->pending.num_pending) {
3240			*result = obj_req->pending.result;
3241			obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3242			goto again;
3243		}
3244		obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3245		return false;
3246	case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3247		if (!pending_result_dec(&obj_req->pending, result))
3248			return false;
3249		fallthrough;
3250	case RBD_OBJ_COPYUP_OBJECT_MAPS:
3251		if (*result) {
3252			rbd_warn(rbd_dev, "snap object map update failed: %d",
3253				 *result);
3254			return true;
3255		}
3256
3257		rbd_obj_copyup_write_object(obj_req);
3258		if (!obj_req->pending.num_pending) {
3259			*result = obj_req->pending.result;
3260			obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3261			goto again;
3262		}
3263		obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3264		return false;
3265	case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3266		if (!pending_result_dec(&obj_req->pending, result))
3267			return false;
3268		fallthrough;
3269	case RBD_OBJ_COPYUP_WRITE_OBJECT:
3270		return true;
3271	default:
3272		BUG();
3273	}
3274}
3275
3276/*
3277 * Return:
3278 *   0 - object map update sent
3279 *   1 - object map update isn't needed
3280 *  <0 - error
3281 */
3282static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3283{
3284	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3285	u8 current_state = OBJECT_PENDING;
3286
3287	if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3288		return 1;
3289
3290	if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3291		return 1;
3292
3293	return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3294				     &current_state);
3295}
3296
3297static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3298{
3299	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3300	int ret;
3301
3302again:
3303	switch (obj_req->write_state) {
3304	case RBD_OBJ_WRITE_START:
3305		rbd_assert(!*result);
3306
3307		rbd_obj_set_copyup_enabled(obj_req);
3308		if (rbd_obj_write_is_noop(obj_req))
3309			return true;
3310
3311		ret = rbd_obj_write_pre_object_map(obj_req);
3312		if (ret < 0) {
3313			*result = ret;
3314			return true;
3315		}
3316		obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3317		if (ret > 0)
3318			goto again;
3319		return false;
3320	case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3321		if (*result) {
3322			rbd_warn(rbd_dev, "pre object map update failed: %d",
3323				 *result);
3324			return true;
3325		}
3326		ret = rbd_obj_write_object(obj_req);
3327		if (ret) {
3328			*result = ret;
3329			return true;
3330		}
3331		obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3332		return false;
3333	case RBD_OBJ_WRITE_OBJECT:
3334		if (*result == -ENOENT) {
3335			if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3336				*result = 0;
3337				obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3338				obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3339				goto again;
3340			}
3341			/*
3342			 * On a non-existent object:
3343			 *   delete - -ENOENT, truncate/zero - 0
3344			 */
3345			if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3346				*result = 0;
3347		}
3348		if (*result)
3349			return true;
3350
3351		obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3352		goto again;
3353	case __RBD_OBJ_WRITE_COPYUP:
3354		if (!rbd_obj_advance_copyup(obj_req, result))
3355			return false;
3356		fallthrough;
3357	case RBD_OBJ_WRITE_COPYUP:
3358		if (*result) {
3359			rbd_warn(rbd_dev, "copyup failed: %d", *result);
3360			return true;
3361		}
3362		ret = rbd_obj_write_post_object_map(obj_req);
3363		if (ret < 0) {
3364			*result = ret;
3365			return true;
3366		}
3367		obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3368		if (ret > 0)
3369			goto again;
3370		return false;
3371	case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3372		if (*result)
3373			rbd_warn(rbd_dev, "post object map update failed: %d",
3374				 *result);
3375		return true;
3376	default:
3377		BUG();
3378	}
3379}
3380
3381/*
3382 * Return true if @obj_req is completed.
3383 */
3384static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3385				     int *result)
3386{
3387	struct rbd_img_request *img_req = obj_req->img_request;
3388	struct rbd_device *rbd_dev = img_req->rbd_dev;
3389	bool done;
3390
3391	mutex_lock(&obj_req->state_mutex);
3392	if (!rbd_img_is_write(img_req))
3393		done = rbd_obj_advance_read(obj_req, result);
3394	else
3395		done = rbd_obj_advance_write(obj_req, result);
3396	mutex_unlock(&obj_req->state_mutex);
3397
3398	if (done && *result) {
3399		rbd_assert(*result < 0);
3400		rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3401			 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3402			 obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3403	}
3404	return done;
3405}
3406
3407/*
3408 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3409 * recursion.
3410 */
3411static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3412{
3413	if (__rbd_obj_handle_request(obj_req, &result))
3414		rbd_img_handle_request(obj_req->img_request, result);
3415}
3416
3417static bool need_exclusive_lock(struct rbd_img_request *img_req)
3418{
3419	struct rbd_device *rbd_dev = img_req->rbd_dev;
3420
3421	if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3422		return false;
3423
3424	if (rbd_is_ro(rbd_dev))
3425		return false;
3426
3427	rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3428	if (rbd_dev->opts->lock_on_read ||
3429	    (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3430		return true;
3431
3432	return rbd_img_is_write(img_req);
3433}
3434
3435static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3436{
3437	struct rbd_device *rbd_dev = img_req->rbd_dev;
3438	bool locked;
3439
3440	lockdep_assert_held(&rbd_dev->lock_rwsem);
3441	locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3442	spin_lock(&rbd_dev->lock_lists_lock);
3443	rbd_assert(list_empty(&img_req->lock_item));
3444	if (!locked)
3445		list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3446	else
3447		list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3448	spin_unlock(&rbd_dev->lock_lists_lock);
3449	return locked;
3450}
3451
3452static void rbd_lock_del_request(struct rbd_img_request *img_req)
3453{
3454	struct rbd_device *rbd_dev = img_req->rbd_dev;
3455	bool need_wakeup = false;
3456
3457	lockdep_assert_held(&rbd_dev->lock_rwsem);
3458	spin_lock(&rbd_dev->lock_lists_lock);
3459	if (!list_empty(&img_req->lock_item)) {
3460		list_del_init(&img_req->lock_item);
3461		need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3462			       list_empty(&rbd_dev->running_list));
3463	}
3464	spin_unlock(&rbd_dev->lock_lists_lock);
3465	if (need_wakeup)
3466		complete(&rbd_dev->releasing_wait);
3467}
3468
3469static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3470{
3471	struct rbd_device *rbd_dev = img_req->rbd_dev;
3472
3473	if (!need_exclusive_lock(img_req))
3474		return 1;
3475
3476	if (rbd_lock_add_request(img_req))
3477		return 1;
3478
3479	if (rbd_dev->opts->exclusive) {
3480		WARN_ON(1); /* lock got released? */
3481		return -EROFS;
3482	}
3483
3484	/*
3485	 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3486	 * and cancel_delayed_work() in wake_lock_waiters().
3487	 */
3488	dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3489	queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3490	return 0;
3491}
3492
3493static void rbd_img_object_requests(struct rbd_img_request *img_req)
3494{
3495	struct rbd_device *rbd_dev = img_req->rbd_dev;
3496	struct rbd_obj_request *obj_req;
3497
3498	rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3499	rbd_assert(!need_exclusive_lock(img_req) ||
3500		   __rbd_is_lock_owner(rbd_dev));
3501
3502	if (rbd_img_is_write(img_req)) {
3503		rbd_assert(!img_req->snapc);
3504		down_read(&rbd_dev->header_rwsem);
3505		img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
3506		up_read(&rbd_dev->header_rwsem);
3507	}
3508
3509	for_each_obj_request(img_req, obj_req) {
3510		int result = 0;
3511
3512		if (__rbd_obj_handle_request(obj_req, &result)) {
3513			if (result) {
3514				img_req->pending.result = result;
3515				return;
3516			}
3517		} else {
3518			img_req->pending.num_pending++;
3519		}
3520	}
3521}
3522
3523static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3524{
3525	int ret;
3526
3527again:
3528	switch (img_req->state) {
3529	case RBD_IMG_START:
3530		rbd_assert(!*result);
3531
3532		ret = rbd_img_exclusive_lock(img_req);
3533		if (ret < 0) {
3534			*result = ret;
3535			return true;
3536		}
3537		img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3538		if (ret > 0)
3539			goto again;
3540		return false;
3541	case RBD_IMG_EXCLUSIVE_LOCK:
3542		if (*result)
3543			return true;
3544
3545		rbd_img_object_requests(img_req);
3546		if (!img_req->pending.num_pending) {
3547			*result = img_req->pending.result;
3548			img_req->state = RBD_IMG_OBJECT_REQUESTS;
3549			goto again;
3550		}
3551		img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3552		return false;
3553	case __RBD_IMG_OBJECT_REQUESTS:
3554		if (!pending_result_dec(&img_req->pending, result))
3555			return false;
3556		fallthrough;
3557	case RBD_IMG_OBJECT_REQUESTS:
3558		return true;
3559	default:
3560		BUG();
3561	}
3562}
3563
3564/*
3565 * Return true if @img_req is completed.
3566 */
3567static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3568				     int *result)
3569{
3570	struct rbd_device *rbd_dev = img_req->rbd_dev;
3571	bool done;
3572
3573	if (need_exclusive_lock(img_req)) {
3574		down_read(&rbd_dev->lock_rwsem);
3575		mutex_lock(&img_req->state_mutex);
3576		done = rbd_img_advance(img_req, result);
3577		if (done)
3578			rbd_lock_del_request(img_req);
3579		mutex_unlock(&img_req->state_mutex);
3580		up_read(&rbd_dev->lock_rwsem);
3581	} else {
3582		mutex_lock(&img_req->state_mutex);
3583		done = rbd_img_advance(img_req, result);
3584		mutex_unlock(&img_req->state_mutex);
3585	}
3586
3587	if (done && *result) {
3588		rbd_assert(*result < 0);
3589		rbd_warn(rbd_dev, "%s%s result %d",
3590		      test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3591		      obj_op_name(img_req->op_type), *result);
3592	}
3593	return done;
3594}
3595
3596static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3597{
3598again:
3599	if (!__rbd_img_handle_request(img_req, &result))
3600		return;
3601
3602	if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3603		struct rbd_obj_request *obj_req = img_req->obj_request;
3604
3605		rbd_img_request_destroy(img_req);
3606		if (__rbd_obj_handle_request(obj_req, &result)) {
3607			img_req = obj_req->img_request;
3608			goto again;
3609		}
3610	} else {
3611		struct request *rq = blk_mq_rq_from_pdu(img_req);
3612
3613		rbd_img_request_destroy(img_req);
3614		blk_mq_end_request(rq, errno_to_blk_status(result));
3615	}
3616}
3617
3618static const struct rbd_client_id rbd_empty_cid;
3619
3620static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3621			  const struct rbd_client_id *rhs)
3622{
3623	return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3624}
3625
3626static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3627{
3628	struct rbd_client_id cid;
3629
3630	mutex_lock(&rbd_dev->watch_mutex);
3631	cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3632	cid.handle = rbd_dev->watch_cookie;
3633	mutex_unlock(&rbd_dev->watch_mutex);
3634	return cid;
3635}
3636
3637/*
3638 * lock_rwsem must be held for write
3639 */
3640static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3641			      const struct rbd_client_id *cid)
3642{
3643	dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3644	     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3645	     cid->gid, cid->handle);
3646	rbd_dev->owner_cid = *cid; /* struct */
3647}
3648
3649static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3650{
3651	mutex_lock(&rbd_dev->watch_mutex);
3652	sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3653	mutex_unlock(&rbd_dev->watch_mutex);
3654}
3655
3656static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3657{
3658	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3659
3660	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3661	strcpy(rbd_dev->lock_cookie, cookie);
3662	rbd_set_owner_cid(rbd_dev, &cid);
3663	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3664}
3665
3666/*
3667 * lock_rwsem must be held for write
3668 */
3669static int rbd_lock(struct rbd_device *rbd_dev)
3670{
3671	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3672	char cookie[32];
3673	int ret;
3674
3675	WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3676		rbd_dev->lock_cookie[0] != '\0');
3677
3678	format_lock_cookie(rbd_dev, cookie);
3679	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3680			    RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3681			    RBD_LOCK_TAG, "", 0);
3682	if (ret && ret != -EEXIST)
3683		return ret;
3684
3685	__rbd_lock(rbd_dev, cookie);
3686	return 0;
3687}
3688
3689/*
3690 * lock_rwsem must be held for write
3691 */
3692static void rbd_unlock(struct rbd_device *rbd_dev)
3693{
3694	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3695	int ret;
3696
3697	WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3698		rbd_dev->lock_cookie[0] == '\0');
3699
3700	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3701			      RBD_LOCK_NAME, rbd_dev->lock_cookie);
3702	if (ret && ret != -ENOENT)
3703		rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3704
3705	/* treat errors as the image is unlocked */
3706	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3707	rbd_dev->lock_cookie[0] = '\0';
3708	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3709	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3710}
3711
3712static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3713				enum rbd_notify_op notify_op,
3714				struct page ***preply_pages,
3715				size_t *preply_len)
3716{
3717	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3718	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3719	char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3720	int buf_size = sizeof(buf);
3721	void *p = buf;
3722
3723	dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3724
3725	/* encode *LockPayload NotifyMessage (op + ClientId) */
3726	ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3727	ceph_encode_32(&p, notify_op);
3728	ceph_encode_64(&p, cid.gid);
3729	ceph_encode_64(&p, cid.handle);
3730
3731	return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3732				&rbd_dev->header_oloc, buf, buf_size,
3733				RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3734}
3735
3736static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3737			       enum rbd_notify_op notify_op)
3738{
3739	__rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3740}
3741
3742static void rbd_notify_acquired_lock(struct work_struct *work)
3743{
3744	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3745						  acquired_lock_work);
3746
3747	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3748}
3749
3750static void rbd_notify_released_lock(struct work_struct *work)
3751{
3752	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3753						  released_lock_work);
3754
3755	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3756}
3757
3758static int rbd_request_lock(struct rbd_device *rbd_dev)
3759{
3760	struct page **reply_pages;
3761	size_t reply_len;
3762	bool lock_owner_responded = false;
3763	int ret;
3764
3765	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3766
3767	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3768				   &reply_pages, &reply_len);
3769	if (ret && ret != -ETIMEDOUT) {
3770		rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3771		goto out;
3772	}
3773
3774	if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3775		void *p = page_address(reply_pages[0]);
3776		void *const end = p + reply_len;
3777		u32 n;
3778
3779		ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3780		while (n--) {
3781			u8 struct_v;
3782			u32 len;
3783
3784			ceph_decode_need(&p, end, 8 + 8, e_inval);
3785			p += 8 + 8; /* skip gid and cookie */
3786
3787			ceph_decode_32_safe(&p, end, len, e_inval);
3788			if (!len)
3789				continue;
3790
3791			if (lock_owner_responded) {
3792				rbd_warn(rbd_dev,
3793					 "duplicate lock owners detected");
3794				ret = -EIO;
3795				goto out;
3796			}
3797
3798			lock_owner_responded = true;
3799			ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3800						  &struct_v, &len);
3801			if (ret) {
3802				rbd_warn(rbd_dev,
3803					 "failed to decode ResponseMessage: %d",
3804					 ret);
3805				goto e_inval;
3806			}
3807
3808			ret = ceph_decode_32(&p);
3809		}
3810	}
3811
3812	if (!lock_owner_responded) {
3813		rbd_warn(rbd_dev, "no lock owners detected");
3814		ret = -ETIMEDOUT;
3815	}
3816
3817out:
3818	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3819	return ret;
3820
3821e_inval:
3822	ret = -EINVAL;
3823	goto out;
3824}
3825
3826/*
3827 * Either image request state machine(s) or rbd_add_acquire_lock()
3828 * (i.e. "rbd map").
3829 */
3830static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3831{
3832	struct rbd_img_request *img_req;
3833
3834	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3835	lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3836
3837	cancel_delayed_work(&rbd_dev->lock_dwork);
3838	if (!completion_done(&rbd_dev->acquire_wait)) {
3839		rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3840			   list_empty(&rbd_dev->running_list));
3841		rbd_dev->acquire_err = result;
3842		complete_all(&rbd_dev->acquire_wait);
3843		return;
3844	}
3845
3846	while (!list_empty(&rbd_dev->acquiring_list)) {
3847		img_req = list_first_entry(&rbd_dev->acquiring_list,
3848					   struct rbd_img_request, lock_item);
3849		mutex_lock(&img_req->state_mutex);
3850		rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3851		if (!result)
3852			list_move_tail(&img_req->lock_item,
3853				       &rbd_dev->running_list);
3854		else
3855			list_del_init(&img_req->lock_item);
3856		rbd_img_schedule(img_req, result);
3857		mutex_unlock(&img_req->state_mutex);
3858	}
3859}
3860
3861static bool locker_equal(const struct ceph_locker *lhs,
3862			 const struct ceph_locker *rhs)
3863{
3864	return lhs->id.name.type == rhs->id.name.type &&
3865	       lhs->id.name.num == rhs->id.name.num &&
3866	       !strcmp(lhs->id.cookie, rhs->id.cookie) &&
3867	       ceph_addr_equal_no_type(&lhs->info.addr, &rhs->info.addr);
3868}
3869
3870static void free_locker(struct ceph_locker *locker)
3871{
3872	if (locker)
3873		ceph_free_lockers(locker, 1);
3874}
3875
3876static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev)
3877{
3878	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3879	struct ceph_locker *lockers;
3880	u32 num_lockers;
3881	u8 lock_type;
3882	char *lock_tag;
3883	u64 handle;
3884	int ret;
3885
3886	ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3887				 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3888				 &lock_type, &lock_tag, &lockers, &num_lockers);
3889	if (ret) {
3890		rbd_warn(rbd_dev, "failed to get header lockers: %d", ret);
3891		return ERR_PTR(ret);
3892	}
3893
3894	if (num_lockers == 0) {
3895		dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3896		lockers = NULL;
3897		goto out;
3898	}
3899
3900	if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3901		rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3902			 lock_tag);
3903		goto err_busy;
3904	}
3905
3906	if (lock_type != CEPH_CLS_LOCK_EXCLUSIVE) {
3907		rbd_warn(rbd_dev, "incompatible lock type detected");
3908		goto err_busy;
3909	}
3910
3911	WARN_ON(num_lockers != 1);
3912	ret = sscanf(lockers[0].id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu",
3913		     &handle);
3914	if (ret != 1) {
3915		rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3916			 lockers[0].id.cookie);
3917		goto err_busy;
3918	}
3919	if (ceph_addr_is_blank(&lockers[0].info.addr)) {
3920		rbd_warn(rbd_dev, "locker has a blank address");
3921		goto err_busy;
3922	}
3923
3924	dout("%s rbd_dev %p got locker %s%llu@%pISpc/%u handle %llu\n",
3925	     __func__, rbd_dev, ENTITY_NAME(lockers[0].id.name),
3926	     &lockers[0].info.addr.in_addr,
3927	     le32_to_cpu(lockers[0].info.addr.nonce), handle);
3928
3929out:
3930	kfree(lock_tag);
3931	return lockers;
3932
3933err_busy:
3934	kfree(lock_tag);
3935	ceph_free_lockers(lockers, num_lockers);
3936	return ERR_PTR(-EBUSY);
3937}
3938
3939static int find_watcher(struct rbd_device *rbd_dev,
3940			const struct ceph_locker *locker)
3941{
3942	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3943	struct ceph_watch_item *watchers;
3944	u32 num_watchers;
3945	u64 cookie;
3946	int i;
3947	int ret;
3948
3949	ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3950				      &rbd_dev->header_oloc, &watchers,
3951				      &num_watchers);
3952	if (ret) {
3953		rbd_warn(rbd_dev, "failed to get watchers: %d", ret);
3954		return ret;
3955	}
3956
3957	sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3958	for (i = 0; i < num_watchers; i++) {
3959		/*
3960		 * Ignore addr->type while comparing.  This mimics
3961		 * entity_addr_t::get_legacy_str() + strcmp().
3962		 */
3963		if (ceph_addr_equal_no_type(&watchers[i].addr,
3964					    &locker->info.addr) &&
3965		    watchers[i].cookie == cookie) {
3966			struct rbd_client_id cid = {
3967				.gid = le64_to_cpu(watchers[i].name.num),
3968				.handle = cookie,
3969			};
3970
3971			dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3972			     rbd_dev, cid.gid, cid.handle);
3973			rbd_set_owner_cid(rbd_dev, &cid);
3974			ret = 1;
3975			goto out;
3976		}
3977	}
3978
3979	dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3980	ret = 0;
3981out:
3982	kfree(watchers);
3983	return ret;
3984}
3985
3986/*
3987 * lock_rwsem must be held for write
3988 */
3989static int rbd_try_lock(struct rbd_device *rbd_dev)
3990{
3991	struct ceph_client *client = rbd_dev->rbd_client->client;
3992	struct ceph_locker *locker, *refreshed_locker;
3993	int ret;
3994
3995	for (;;) {
3996		locker = refreshed_locker = NULL;
3997
3998		ret = rbd_lock(rbd_dev);
3999		if (!ret)
4000			goto out;
4001		if (ret != -EBUSY) {
4002			rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4003			goto out;
4004		}
4005
4006		/* determine if the current lock holder is still alive */
4007		locker = get_lock_owner_info(rbd_dev);
4008		if (IS_ERR(locker)) {
4009			ret = PTR_ERR(locker);
4010			locker = NULL;
4011			goto out;
4012		}
4013		if (!locker)
4014			goto again;
4015
4016		ret = find_watcher(rbd_dev, locker);
4017		if (ret)
4018			goto out; /* request lock or error */
4019
4020		refreshed_locker = get_lock_owner_info(rbd_dev);
4021		if (IS_ERR(refreshed_locker)) {
4022			ret = PTR_ERR(refreshed_locker);
4023			refreshed_locker = NULL;
4024			goto out;
4025		}
4026		if (!refreshed_locker ||
4027		    !locker_equal(locker, refreshed_locker))
4028			goto again;
4029
4030		rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4031			 ENTITY_NAME(locker->id.name));
4032
4033		ret = ceph_monc_blocklist_add(&client->monc,
4034					      &locker->info.addr);
4035		if (ret) {
4036			rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d",
4037				 ENTITY_NAME(locker->id.name), ret);
4038			goto out;
4039		}
4040
4041		ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4042					  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4043					  locker->id.cookie, &locker->id.name);
4044		if (ret && ret != -ENOENT) {
4045			rbd_warn(rbd_dev, "failed to break header lock: %d",
4046				 ret);
4047			goto out;
4048		}
4049
4050again:
4051		free_locker(refreshed_locker);
4052		free_locker(locker);
4053	}
4054
4055out:
4056	free_locker(refreshed_locker);
4057	free_locker(locker);
4058	return ret;
4059}
4060
4061static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4062{
4063	int ret;
4064
4065	ret = rbd_dev_refresh(rbd_dev);
4066	if (ret)
4067		return ret;
4068
4069	if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4070		ret = rbd_object_map_open(rbd_dev);
4071		if (ret)
4072			return ret;
4073	}
4074
4075	return 0;
4076}
4077
4078/*
4079 * Return:
4080 *   0 - lock acquired
4081 *   1 - caller should call rbd_request_lock()
4082 *  <0 - error
4083 */
4084static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4085{
4086	int ret;
4087
4088	down_read(&rbd_dev->lock_rwsem);
4089	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4090	     rbd_dev->lock_state);
4091	if (__rbd_is_lock_owner(rbd_dev)) {
4092		up_read(&rbd_dev->lock_rwsem);
4093		return 0;
4094	}
4095
4096	up_read(&rbd_dev->lock_rwsem);
4097	down_write(&rbd_dev->lock_rwsem);
4098	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4099	     rbd_dev->lock_state);
4100	if (__rbd_is_lock_owner(rbd_dev)) {
4101		up_write(&rbd_dev->lock_rwsem);
4102		return 0;
4103	}
4104
4105	ret = rbd_try_lock(rbd_dev);
4106	if (ret < 0) {
4107		rbd_warn(rbd_dev, "failed to acquire lock: %d", ret);
4108		goto out;
4109	}
4110	if (ret > 0) {
4111		up_write(&rbd_dev->lock_rwsem);
4112		return ret;
4113	}
4114
4115	rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4116	rbd_assert(list_empty(&rbd_dev->running_list));
4117
4118	ret = rbd_post_acquire_action(rbd_dev);
4119	if (ret) {
4120		rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4121		/*
4122		 * Can't stay in RBD_LOCK_STATE_LOCKED because
4123		 * rbd_lock_add_request() would let the request through,
4124		 * assuming that e.g. object map is locked and loaded.
4125		 */
4126		rbd_unlock(rbd_dev);
4127	}
4128
4129out:
4130	wake_lock_waiters(rbd_dev, ret);
4131	up_write(&rbd_dev->lock_rwsem);
4132	return ret;
4133}
4134
4135static void rbd_acquire_lock(struct work_struct *work)
4136{
4137	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4138					    struct rbd_device, lock_dwork);
4139	int ret;
4140
4141	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4142again:
4143	ret = rbd_try_acquire_lock(rbd_dev);
4144	if (ret <= 0) {
4145		dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4146		return;
4147	}
4148
4149	ret = rbd_request_lock(rbd_dev);
4150	if (ret == -ETIMEDOUT) {
4151		goto again; /* treat this as a dead client */
4152	} else if (ret == -EROFS) {
4153		rbd_warn(rbd_dev, "peer will not release lock");
4154		down_write(&rbd_dev->lock_rwsem);
4155		wake_lock_waiters(rbd_dev, ret);
4156		up_write(&rbd_dev->lock_rwsem);
4157	} else if (ret < 0) {
4158		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4159		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4160				 RBD_RETRY_DELAY);
4161	} else {
4162		/*
4163		 * lock owner acked, but resend if we don't see them
4164		 * release the lock
4165		 */
4166		dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4167		     rbd_dev);
4168		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4169		    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4170	}
4171}
4172
4173static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4174{
4175	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4176	lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4177
4178	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4179		return false;
4180
4181	/*
4182	 * Ensure that all in-flight IO is flushed.
4183	 */
4184	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4185	rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4186	if (list_empty(&rbd_dev->running_list))
4187		return true;
4188
4189	up_write(&rbd_dev->lock_rwsem);
4190	wait_for_completion(&rbd_dev->releasing_wait);
4191
4192	down_write(&rbd_dev->lock_rwsem);
4193	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4194		return false;
4195
4196	rbd_assert(list_empty(&rbd_dev->running_list));
4197	return true;
4198}
4199
4200static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4201{
4202	if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4203		rbd_object_map_close(rbd_dev);
4204}
4205
4206static void __rbd_release_lock(struct rbd_device *rbd_dev)
4207{
4208	rbd_assert(list_empty(&rbd_dev->running_list));
4209
4210	rbd_pre_release_action(rbd_dev);
4211	rbd_unlock(rbd_dev);
4212}
4213
4214/*
4215 * lock_rwsem must be held for write
4216 */
4217static void rbd_release_lock(struct rbd_device *rbd_dev)
4218{
4219	if (!rbd_quiesce_lock(rbd_dev))
4220		return;
4221
4222	__rbd_release_lock(rbd_dev);
4223
4224	/*
4225	 * Give others a chance to grab the lock - we would re-acquire
4226	 * almost immediately if we got new IO while draining the running
4227	 * list otherwise.  We need to ack our own notifications, so this
4228	 * lock_dwork will be requeued from rbd_handle_released_lock() by
4229	 * way of maybe_kick_acquire().
4230	 */
4231	cancel_delayed_work(&rbd_dev->lock_dwork);
4232}
4233
4234static void rbd_release_lock_work(struct work_struct *work)
4235{
4236	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4237						  unlock_work);
4238
4239	down_write(&rbd_dev->lock_rwsem);
4240	rbd_release_lock(rbd_dev);
4241	up_write(&rbd_dev->lock_rwsem);
4242}
4243
4244static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4245{
4246	bool have_requests;
4247
4248	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4249	if (__rbd_is_lock_owner(rbd_dev))
4250		return;
4251
4252	spin_lock(&rbd_dev->lock_lists_lock);
4253	have_requests = !list_empty(&rbd_dev->acquiring_list);
4254	spin_unlock(&rbd_dev->lock_lists_lock);
4255	if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4256		dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4257		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4258	}
4259}
4260
4261static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4262				     void **p)
4263{
4264	struct rbd_client_id cid = { 0 };
4265
4266	if (struct_v >= 2) {
4267		cid.gid = ceph_decode_64(p);
4268		cid.handle = ceph_decode_64(p);
4269	}
4270
4271	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4272	     cid.handle);
4273	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4274		down_write(&rbd_dev->lock_rwsem);
4275		if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4276			dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4277			     __func__, rbd_dev, cid.gid, cid.handle);
4278		} else {
4279			rbd_set_owner_cid(rbd_dev, &cid);
4280		}
4281		downgrade_write(&rbd_dev->lock_rwsem);
4282	} else {
4283		down_read(&rbd_dev->lock_rwsem);
4284	}
4285
4286	maybe_kick_acquire(rbd_dev);
4287	up_read(&rbd_dev->lock_rwsem);
4288}
4289
4290static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4291				     void **p)
4292{
4293	struct rbd_client_id cid = { 0 };
4294
4295	if (struct_v >= 2) {
4296		cid.gid = ceph_decode_64(p);
4297		cid.handle = ceph_decode_64(p);
4298	}
4299
4300	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4301	     cid.handle);
4302	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4303		down_write(&rbd_dev->lock_rwsem);
4304		if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4305			dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4306			     __func__, rbd_dev, cid.gid, cid.handle,
4307			     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4308		} else {
4309			rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4310		}
4311		downgrade_write(&rbd_dev->lock_rwsem);
4312	} else {
4313		down_read(&rbd_dev->lock_rwsem);
4314	}
4315
4316	maybe_kick_acquire(rbd_dev);
4317	up_read(&rbd_dev->lock_rwsem);
4318}
4319
4320/*
4321 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4322 * ResponseMessage is needed.
4323 */
4324static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4325				   void **p)
4326{
4327	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4328	struct rbd_client_id cid = { 0 };
4329	int result = 1;
4330
4331	if (struct_v >= 2) {
4332		cid.gid = ceph_decode_64(p);
4333		cid.handle = ceph_decode_64(p);
4334	}
4335
4336	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4337	     cid.handle);
4338	if (rbd_cid_equal(&cid, &my_cid))
4339		return result;
4340
4341	down_read(&rbd_dev->lock_rwsem);
4342	if (__rbd_is_lock_owner(rbd_dev)) {
4343		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4344		    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4345			goto out_unlock;
4346
4347		/*
4348		 * encode ResponseMessage(0) so the peer can detect
4349		 * a missing owner
4350		 */
4351		result = 0;
4352
4353		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4354			if (!rbd_dev->opts->exclusive) {
4355				dout("%s rbd_dev %p queueing unlock_work\n",
4356				     __func__, rbd_dev);
4357				queue_work(rbd_dev->task_wq,
4358					   &rbd_dev->unlock_work);
4359			} else {
4360				/* refuse to release the lock */
4361				result = -EROFS;
4362			}
4363		}
4364	}
4365
4366out_unlock:
4367	up_read(&rbd_dev->lock_rwsem);
4368	return result;
4369}
4370
4371static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4372				     u64 notify_id, u64 cookie, s32 *result)
4373{
4374	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4375	char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4376	int buf_size = sizeof(buf);
4377	int ret;
4378
4379	if (result) {
4380		void *p = buf;
4381
4382		/* encode ResponseMessage */
4383		ceph_start_encoding(&p, 1, 1,
4384				    buf_size - CEPH_ENCODING_START_BLK_LEN);
4385		ceph_encode_32(&p, *result);
4386	} else {
4387		buf_size = 0;
4388	}
4389
4390	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4391				   &rbd_dev->header_oloc, notify_id, cookie,
4392				   buf, buf_size);
4393	if (ret)
4394		rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4395}
4396
4397static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4398				   u64 cookie)
4399{
4400	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4401	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4402}
4403
4404static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4405					  u64 notify_id, u64 cookie, s32 result)
4406{
4407	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4408	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4409}
4410
4411static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4412			 u64 notifier_id, void *data, size_t data_len)
4413{
4414	struct rbd_device *rbd_dev = arg;
4415	void *p = data;
4416	void *const end = p + data_len;
4417	u8 struct_v = 0;
4418	u32 len;
4419	u32 notify_op;
4420	int ret;
4421
4422	dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4423	     __func__, rbd_dev, cookie, notify_id, data_len);
4424	if (data_len) {
4425		ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4426					  &struct_v, &len);
4427		if (ret) {
4428			rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4429				 ret);
4430			return;
4431		}
4432
4433		notify_op = ceph_decode_32(&p);
4434	} else {
4435		/* legacy notification for header updates */
4436		notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4437		len = 0;
4438	}
4439
4440	dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4441	switch (notify_op) {
4442	case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4443		rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4444		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4445		break;
4446	case RBD_NOTIFY_OP_RELEASED_LOCK:
4447		rbd_handle_released_lock(rbd_dev, struct_v, &p);
4448		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4449		break;
4450	case RBD_NOTIFY_OP_REQUEST_LOCK:
4451		ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4452		if (ret <= 0)
4453			rbd_acknowledge_notify_result(rbd_dev, notify_id,
4454						      cookie, ret);
4455		else
4456			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4457		break;
4458	case RBD_NOTIFY_OP_HEADER_UPDATE:
4459		ret = rbd_dev_refresh(rbd_dev);
4460		if (ret)
4461			rbd_warn(rbd_dev, "refresh failed: %d", ret);
4462
4463		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4464		break;
4465	default:
4466		if (rbd_is_lock_owner(rbd_dev))
4467			rbd_acknowledge_notify_result(rbd_dev, notify_id,
4468						      cookie, -EOPNOTSUPP);
4469		else
4470			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4471		break;
4472	}
4473}
4474
4475static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4476
4477static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4478{
4479	struct rbd_device *rbd_dev = arg;
4480
4481	rbd_warn(rbd_dev, "encountered watch error: %d", err);
4482
4483	down_write(&rbd_dev->lock_rwsem);
4484	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4485	up_write(&rbd_dev->lock_rwsem);
4486
4487	mutex_lock(&rbd_dev->watch_mutex);
4488	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4489		__rbd_unregister_watch(rbd_dev);
4490		rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4491
4492		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4493	}
4494	mutex_unlock(&rbd_dev->watch_mutex);
4495}
4496
4497/*
4498 * watch_mutex must be locked
4499 */
4500static int __rbd_register_watch(struct rbd_device *rbd_dev)
4501{
4502	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4503	struct ceph_osd_linger_request *handle;
4504
4505	rbd_assert(!rbd_dev->watch_handle);
4506	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4507
4508	handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4509				 &rbd_dev->header_oloc, rbd_watch_cb,
4510				 rbd_watch_errcb, rbd_dev);
4511	if (IS_ERR(handle))
4512		return PTR_ERR(handle);
4513
4514	rbd_dev->watch_handle = handle;
4515	return 0;
4516}
4517
4518/*
4519 * watch_mutex must be locked
4520 */
4521static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4522{
4523	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4524	int ret;
4525
4526	rbd_assert(rbd_dev->watch_handle);
4527	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4528
4529	ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4530	if (ret)
4531		rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4532
4533	rbd_dev->watch_handle = NULL;
4534}
4535
4536static int rbd_register_watch(struct rbd_device *rbd_dev)
4537{
4538	int ret;
4539
4540	mutex_lock(&rbd_dev->watch_mutex);
4541	rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4542	ret = __rbd_register_watch(rbd_dev);
4543	if (ret)
4544		goto out;
4545
4546	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4547	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4548
4549out:
4550	mutex_unlock(&rbd_dev->watch_mutex);
4551	return ret;
4552}
4553
4554static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4555{
4556	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4557
4558	cancel_work_sync(&rbd_dev->acquired_lock_work);
4559	cancel_work_sync(&rbd_dev->released_lock_work);
4560	cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4561	cancel_work_sync(&rbd_dev->unlock_work);
4562}
4563
4564/*
4565 * header_rwsem must not be held to avoid a deadlock with
4566 * rbd_dev_refresh() when flushing notifies.
4567 */
4568static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4569{
4570	cancel_tasks_sync(rbd_dev);
4571
4572	mutex_lock(&rbd_dev->watch_mutex);
4573	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4574		__rbd_unregister_watch(rbd_dev);
4575	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4576	mutex_unlock(&rbd_dev->watch_mutex);
4577
4578	cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4579	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4580}
4581
4582/*
4583 * lock_rwsem must be held for write
4584 */
4585static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4586{
4587	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4588	char cookie[32];
4589	int ret;
4590
4591	if (!rbd_quiesce_lock(rbd_dev))
4592		return;
4593
4594	format_lock_cookie(rbd_dev, cookie);
4595	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4596				  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4597				  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4598				  RBD_LOCK_TAG, cookie);
4599	if (ret) {
4600		if (ret != -EOPNOTSUPP)
4601			rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4602				 ret);
4603
4604		/*
4605		 * Lock cookie cannot be updated on older OSDs, so do
4606		 * a manual release and queue an acquire.
4607		 */
4608		__rbd_release_lock(rbd_dev);
4609		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4610	} else {
4611		__rbd_lock(rbd_dev, cookie);
4612		wake_lock_waiters(rbd_dev, 0);
4613	}
4614}
4615
4616static void rbd_reregister_watch(struct work_struct *work)
4617{
4618	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4619					    struct rbd_device, watch_dwork);
4620	int ret;
4621
4622	dout("%s rbd_dev %p\n", __func__, rbd_dev);
4623
4624	mutex_lock(&rbd_dev->watch_mutex);
4625	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4626		mutex_unlock(&rbd_dev->watch_mutex);
4627		return;
4628	}
4629
4630	ret = __rbd_register_watch(rbd_dev);
4631	if (ret) {
4632		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4633		if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4634			queue_delayed_work(rbd_dev->task_wq,
4635					   &rbd_dev->watch_dwork,
4636					   RBD_RETRY_DELAY);
4637			mutex_unlock(&rbd_dev->watch_mutex);
4638			return;
4639		}
4640
4641		mutex_unlock(&rbd_dev->watch_mutex);
4642		down_write(&rbd_dev->lock_rwsem);
4643		wake_lock_waiters(rbd_dev, ret);
4644		up_write(&rbd_dev->lock_rwsem);
4645		return;
4646	}
4647
4648	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4649	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4650	mutex_unlock(&rbd_dev->watch_mutex);
4651
4652	down_write(&rbd_dev->lock_rwsem);
4653	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4654		rbd_reacquire_lock(rbd_dev);
4655	up_write(&rbd_dev->lock_rwsem);
4656
4657	ret = rbd_dev_refresh(rbd_dev);
4658	if (ret)
4659		rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4660}
4661
4662/*
4663 * Synchronous osd object method call.  Returns the number of bytes
4664 * returned in the outbound buffer, or a negative error code.
4665 */
4666static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4667			     struct ceph_object_id *oid,
4668			     struct ceph_object_locator *oloc,
4669			     const char *method_name,
4670			     const void *outbound,
4671			     size_t outbound_size,
4672			     void *inbound,
4673			     size_t inbound_size)
4674{
4675	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4676	struct page *req_page = NULL;
4677	struct page *reply_page;
4678	int ret;
4679
4680	/*
4681	 * Method calls are ultimately read operations.  The result
4682	 * should placed into the inbound buffer provided.  They
4683	 * also supply outbound data--parameters for the object
4684	 * method.  Currently if this is present it will be a
4685	 * snapshot id.
4686	 */
4687	if (outbound) {
4688		if (outbound_size > PAGE_SIZE)
4689			return -E2BIG;
4690
4691		req_page = alloc_page(GFP_KERNEL);
4692		if (!req_page)
4693			return -ENOMEM;
4694
4695		memcpy(page_address(req_page), outbound, outbound_size);
4696	}
4697
4698	reply_page = alloc_page(GFP_KERNEL);
4699	if (!reply_page) {
4700		if (req_page)
4701			__free_page(req_page);
4702		return -ENOMEM;
4703	}
4704
4705	ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4706			     CEPH_OSD_FLAG_READ, req_page, outbound_size,
4707			     &reply_page, &inbound_size);
4708	if (!ret) {
4709		memcpy(inbound, page_address(reply_page), inbound_size);
4710		ret = inbound_size;
4711	}
4712
4713	if (req_page)
4714		__free_page(req_page);
4715	__free_page(reply_page);
4716	return ret;
4717}
4718
4719static void rbd_queue_workfn(struct work_struct *work)
4720{
4721	struct rbd_img_request *img_request =
4722	    container_of(work, struct rbd_img_request, work);
4723	struct rbd_device *rbd_dev = img_request->rbd_dev;
4724	enum obj_operation_type op_type = img_request->op_type;
4725	struct request *rq = blk_mq_rq_from_pdu(img_request);
4726	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4727	u64 length = blk_rq_bytes(rq);
4728	u64 mapping_size;
4729	int result;
4730
4731	/* Ignore/skip any zero-length requests */
4732	if (!length) {
4733		dout("%s: zero-length request\n", __func__);
4734		result = 0;
4735		goto err_img_request;
4736	}
4737
4738	blk_mq_start_request(rq);
4739
4740	down_read(&rbd_dev->header_rwsem);
4741	mapping_size = rbd_dev->mapping.size;
4742	rbd_img_capture_header(img_request);
4743	up_read(&rbd_dev->header_rwsem);
4744
4745	if (offset + length > mapping_size) {
4746		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4747			 length, mapping_size);
4748		result = -EIO;
4749		goto err_img_request;
4750	}
4751
4752	dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4753	     img_request, obj_op_name(op_type), offset, length);
4754
4755	if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4756		result = rbd_img_fill_nodata(img_request, offset, length);
4757	else
4758		result = rbd_img_fill_from_bio(img_request, offset, length,
4759					       rq->bio);
4760	if (result)
4761		goto err_img_request;
4762
4763	rbd_img_handle_request(img_request, 0);
4764	return;
4765
4766err_img_request:
4767	rbd_img_request_destroy(img_request);
4768	if (result)
4769		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4770			 obj_op_name(op_type), length, offset, result);
4771	blk_mq_end_request(rq, errno_to_blk_status(result));
4772}
4773
4774static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4775		const struct blk_mq_queue_data *bd)
4776{
4777	struct rbd_device *rbd_dev = hctx->queue->queuedata;
4778	struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4779	enum obj_operation_type op_type;
4780
4781	switch (req_op(bd->rq)) {
4782	case REQ_OP_DISCARD:
4783		op_type = OBJ_OP_DISCARD;
4784		break;
4785	case REQ_OP_WRITE_ZEROES:
4786		op_type = OBJ_OP_ZEROOUT;
4787		break;
4788	case REQ_OP_WRITE:
4789		op_type = OBJ_OP_WRITE;
4790		break;
4791	case REQ_OP_READ:
4792		op_type = OBJ_OP_READ;
4793		break;
4794	default:
4795		rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4796		return BLK_STS_IOERR;
4797	}
4798
4799	rbd_img_request_init(img_req, rbd_dev, op_type);
4800
4801	if (rbd_img_is_write(img_req)) {
4802		if (rbd_is_ro(rbd_dev)) {
4803			rbd_warn(rbd_dev, "%s on read-only mapping",
4804				 obj_op_name(img_req->op_type));
4805			return BLK_STS_IOERR;
4806		}
4807		rbd_assert(!rbd_is_snap(rbd_dev));
4808	}
4809
4810	INIT_WORK(&img_req->work, rbd_queue_workfn);
4811	queue_work(rbd_wq, &img_req->work);
4812	return BLK_STS_OK;
4813}
4814
4815static void rbd_free_disk(struct rbd_device *rbd_dev)
4816{
4817	put_disk(rbd_dev->disk);
4818	blk_mq_free_tag_set(&rbd_dev->tag_set);
4819	rbd_dev->disk = NULL;
4820}
4821
4822static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4823			     struct ceph_object_id *oid,
4824			     struct ceph_object_locator *oloc,
4825			     void *buf, int buf_len)
4826
4827{
4828	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4829	struct ceph_osd_request *req;
4830	struct page **pages;
4831	int num_pages = calc_pages_for(0, buf_len);
4832	int ret;
4833
4834	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4835	if (!req)
4836		return -ENOMEM;
4837
4838	ceph_oid_copy(&req->r_base_oid, oid);
4839	ceph_oloc_copy(&req->r_base_oloc, oloc);
4840	req->r_flags = CEPH_OSD_FLAG_READ;
4841
4842	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4843	if (IS_ERR(pages)) {
4844		ret = PTR_ERR(pages);
4845		goto out_req;
4846	}
4847
4848	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4849	osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4850					 true);
4851
4852	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4853	if (ret)
4854		goto out_req;
4855
4856	ceph_osdc_start_request(osdc, req);
4857	ret = ceph_osdc_wait_request(osdc, req);
4858	if (ret >= 0)
4859		ceph_copy_from_page_vector(pages, buf, 0, ret);
4860
4861out_req:
4862	ceph_osdc_put_request(req);
4863	return ret;
4864}
4865
4866/*
4867 * Read the complete header for the given rbd device.  On successful
4868 * return, the rbd_dev->header field will contain up-to-date
4869 * information about the image.
4870 */
4871static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
4872				  struct rbd_image_header *header,
4873				  bool first_time)
4874{
4875	struct rbd_image_header_ondisk *ondisk = NULL;
4876	u32 snap_count = 0;
4877	u64 names_size = 0;
4878	u32 want_count;
4879	int ret;
4880
4881	/*
4882	 * The complete header will include an array of its 64-bit
4883	 * snapshot ids, followed by the names of those snapshots as
4884	 * a contiguous block of NUL-terminated strings.  Note that
4885	 * the number of snapshots could change by the time we read
4886	 * it in, in which case we re-read it.
4887	 */
4888	do {
4889		size_t size;
4890
4891		kfree(ondisk);
4892
4893		size = sizeof (*ondisk);
4894		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4895		size += names_size;
4896		ondisk = kmalloc(size, GFP_KERNEL);
4897		if (!ondisk)
4898			return -ENOMEM;
4899
4900		ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4901					&rbd_dev->header_oloc, ondisk, size);
4902		if (ret < 0)
4903			goto out;
4904		if ((size_t)ret < size) {
4905			ret = -ENXIO;
4906			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4907				size, ret);
4908			goto out;
4909		}
4910		if (!rbd_dev_ondisk_valid(ondisk)) {
4911			ret = -ENXIO;
4912			rbd_warn(rbd_dev, "invalid header");
4913			goto out;
4914		}
4915
4916		names_size = le64_to_cpu(ondisk->snap_names_len);
4917		want_count = snap_count;
4918		snap_count = le32_to_cpu(ondisk->snap_count);
4919	} while (snap_count != want_count);
4920
4921	ret = rbd_header_from_disk(header, ondisk, first_time);
4922out:
4923	kfree(ondisk);
4924
4925	return ret;
4926}
4927
4928static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4929{
4930	sector_t size;
4931
4932	/*
4933	 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4934	 * try to update its size.  If REMOVING is set, updating size
4935	 * is just useless work since the device can't be opened.
4936	 */
4937	if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4938	    !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4939		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4940		dout("setting size to %llu sectors", (unsigned long long)size);
4941		set_capacity_and_notify(rbd_dev->disk, size);
4942	}
4943}
4944
4945static const struct blk_mq_ops rbd_mq_ops = {
4946	.queue_rq	= rbd_queue_rq,
4947};
4948
4949static int rbd_init_disk(struct rbd_device *rbd_dev)
4950{
4951	struct gendisk *disk;
4952	struct request_queue *q;
4953	unsigned int objset_bytes =
4954	    rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
4955	struct queue_limits lim = {
4956		.max_hw_sectors		= objset_bytes >> SECTOR_SHIFT,
4957		.max_user_sectors	= objset_bytes >> SECTOR_SHIFT,
4958		.io_min			= rbd_dev->opts->alloc_size,
4959		.io_opt			= rbd_dev->opts->alloc_size,
4960		.max_segments		= USHRT_MAX,
4961		.max_segment_size	= UINT_MAX,
4962	};
4963	int err;
4964
4965	memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4966	rbd_dev->tag_set.ops = &rbd_mq_ops;
4967	rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4968	rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4969	rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4970	rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4971	rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
4972
4973	err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4974	if (err)
4975		return err;
4976
4977	if (rbd_dev->opts->trim) {
4978		lim.discard_granularity = rbd_dev->opts->alloc_size;
4979		lim.max_hw_discard_sectors = objset_bytes >> SECTOR_SHIFT;
4980		lim.max_write_zeroes_sectors = objset_bytes >> SECTOR_SHIFT;
4981	}
4982
4983	disk = blk_mq_alloc_disk(&rbd_dev->tag_set, &lim, rbd_dev);
4984	if (IS_ERR(disk)) {
4985		err = PTR_ERR(disk);
4986		goto out_tag_set;
4987	}
4988	q = disk->queue;
4989
4990	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4991		 rbd_dev->dev_id);
4992	disk->major = rbd_dev->major;
4993	disk->first_minor = rbd_dev->minor;
4994	if (single_major)
4995		disk->minors = (1 << RBD_SINGLE_MAJOR_PART_SHIFT);
4996	else
4997		disk->minors = RBD_MINORS_PER_MAJOR;
4998	disk->fops = &rbd_bd_ops;
4999	disk->private_data = rbd_dev;
5000
5001	blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5002	/* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5003
5004	if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5005		blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
5006
5007	rbd_dev->disk = disk;
5008
5009	return 0;
5010out_tag_set:
5011	blk_mq_free_tag_set(&rbd_dev->tag_set);
5012	return err;
5013}
5014
5015/*
5016  sysfs
5017*/
5018
5019static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5020{
5021	return container_of(dev, struct rbd_device, dev);
5022}
5023
5024static ssize_t rbd_size_show(struct device *dev,
5025			     struct device_attribute *attr, char *buf)
5026{
5027	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5028
5029	return sprintf(buf, "%llu\n",
5030		(unsigned long long)rbd_dev->mapping.size);
5031}
5032
5033static ssize_t rbd_features_show(struct device *dev,
5034			     struct device_attribute *attr, char *buf)
5035{
5036	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5037
5038	return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5039}
5040
5041static ssize_t rbd_major_show(struct device *dev,
5042			      struct device_attribute *attr, char *buf)
5043{
5044	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5045
5046	if (rbd_dev->major)
5047		return sprintf(buf, "%d\n", rbd_dev->major);
5048
5049	return sprintf(buf, "(none)\n");
5050}
5051
5052static ssize_t rbd_minor_show(struct device *dev,
5053			      struct device_attribute *attr, char *buf)
5054{
5055	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5056
5057	return sprintf(buf, "%d\n", rbd_dev->minor);
5058}
5059
5060static ssize_t rbd_client_addr_show(struct device *dev,
5061				    struct device_attribute *attr, char *buf)
5062{
5063	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5064	struct ceph_entity_addr *client_addr =
5065	    ceph_client_addr(rbd_dev->rbd_client->client);
5066
5067	return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5068		       le32_to_cpu(client_addr->nonce));
5069}
5070
5071static ssize_t rbd_client_id_show(struct device *dev,
5072				  struct device_attribute *attr, char *buf)
5073{
5074	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5075
5076	return sprintf(buf, "client%lld\n",
5077		       ceph_client_gid(rbd_dev->rbd_client->client));
5078}
5079
5080static ssize_t rbd_cluster_fsid_show(struct device *dev,
5081				     struct device_attribute *attr, char *buf)
5082{
5083	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5084
5085	return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5086}
5087
5088static ssize_t rbd_config_info_show(struct device *dev,
5089				    struct device_attribute *attr, char *buf)
5090{
5091	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5092
5093	if (!capable(CAP_SYS_ADMIN))
5094		return -EPERM;
5095
5096	return sprintf(buf, "%s\n", rbd_dev->config_info);
5097}
5098
5099static ssize_t rbd_pool_show(struct device *dev,
5100			     struct device_attribute *attr, char *buf)
5101{
5102	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5103
5104	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5105}
5106
5107static ssize_t rbd_pool_id_show(struct device *dev,
5108			     struct device_attribute *attr, char *buf)
5109{
5110	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5111
5112	return sprintf(buf, "%llu\n",
5113			(unsigned long long) rbd_dev->spec->pool_id);
5114}
5115
5116static ssize_t rbd_pool_ns_show(struct device *dev,
5117				struct device_attribute *attr, char *buf)
5118{
5119	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5120
5121	return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5122}
5123
5124static ssize_t rbd_name_show(struct device *dev,
5125			     struct device_attribute *attr, char *buf)
5126{
5127	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5128
5129	if (rbd_dev->spec->image_name)
5130		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5131
5132	return sprintf(buf, "(unknown)\n");
5133}
5134
5135static ssize_t rbd_image_id_show(struct device *dev,
5136			     struct device_attribute *attr, char *buf)
5137{
5138	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5139
5140	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5141}
5142
5143/*
5144 * Shows the name of the currently-mapped snapshot (or
5145 * RBD_SNAP_HEAD_NAME for the base image).
5146 */
5147static ssize_t rbd_snap_show(struct device *dev,
5148			     struct device_attribute *attr,
5149			     char *buf)
5150{
5151	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5152
5153	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5154}
5155
5156static ssize_t rbd_snap_id_show(struct device *dev,
5157				struct device_attribute *attr, char *buf)
5158{
5159	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5160
5161	return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5162}
5163
5164/*
5165 * For a v2 image, shows the chain of parent images, separated by empty
5166 * lines.  For v1 images or if there is no parent, shows "(no parent
5167 * image)".
5168 */
5169static ssize_t rbd_parent_show(struct device *dev,
5170			       struct device_attribute *attr,
5171			       char *buf)
5172{
5173	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5174	ssize_t count = 0;
5175
5176	if (!rbd_dev->parent)
5177		return sprintf(buf, "(no parent image)\n");
5178
5179	for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5180		struct rbd_spec *spec = rbd_dev->parent_spec;
5181
5182		count += sprintf(&buf[count], "%s"
5183			    "pool_id %llu\npool_name %s\n"
5184			    "pool_ns %s\n"
5185			    "image_id %s\nimage_name %s\n"
5186			    "snap_id %llu\nsnap_name %s\n"
5187			    "overlap %llu\n",
5188			    !count ? "" : "\n", /* first? */
5189			    spec->pool_id, spec->pool_name,
5190			    spec->pool_ns ?: "",
5191			    spec->image_id, spec->image_name ?: "(unknown)",
5192			    spec->snap_id, spec->snap_name,
5193			    rbd_dev->parent_overlap);
5194	}
5195
5196	return count;
5197}
5198
5199static ssize_t rbd_image_refresh(struct device *dev,
5200				 struct device_attribute *attr,
5201				 const char *buf,
5202				 size_t size)
5203{
5204	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5205	int ret;
5206
5207	if (!capable(CAP_SYS_ADMIN))
5208		return -EPERM;
5209
5210	ret = rbd_dev_refresh(rbd_dev);
5211	if (ret)
5212		return ret;
5213
5214	return size;
5215}
5216
5217static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5218static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5219static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5220static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5221static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5222static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5223static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5224static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5225static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5226static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5227static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5228static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5229static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5230static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5231static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5232static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5233static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5234
5235static struct attribute *rbd_attrs[] = {
5236	&dev_attr_size.attr,
5237	&dev_attr_features.attr,
5238	&dev_attr_major.attr,
5239	&dev_attr_minor.attr,
5240	&dev_attr_client_addr.attr,
5241	&dev_attr_client_id.attr,
5242	&dev_attr_cluster_fsid.attr,
5243	&dev_attr_config_info.attr,
5244	&dev_attr_pool.attr,
5245	&dev_attr_pool_id.attr,
5246	&dev_attr_pool_ns.attr,
5247	&dev_attr_name.attr,
5248	&dev_attr_image_id.attr,
5249	&dev_attr_current_snap.attr,
5250	&dev_attr_snap_id.attr,
5251	&dev_attr_parent.attr,
5252	&dev_attr_refresh.attr,
5253	NULL
5254};
5255
5256static struct attribute_group rbd_attr_group = {
5257	.attrs = rbd_attrs,
5258};
5259
5260static const struct attribute_group *rbd_attr_groups[] = {
5261	&rbd_attr_group,
5262	NULL
5263};
5264
5265static void rbd_dev_release(struct device *dev);
5266
5267static const struct device_type rbd_device_type = {
5268	.name		= "rbd",
5269	.groups		= rbd_attr_groups,
5270	.release	= rbd_dev_release,
5271};
5272
5273static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5274{
5275	kref_get(&spec->kref);
5276
5277	return spec;
5278}
5279
5280static void rbd_spec_free(struct kref *kref);
5281static void rbd_spec_put(struct rbd_spec *spec)
5282{
5283	if (spec)
5284		kref_put(&spec->kref, rbd_spec_free);
5285}
5286
5287static struct rbd_spec *rbd_spec_alloc(void)
5288{
5289	struct rbd_spec *spec;
5290
5291	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5292	if (!spec)
5293		return NULL;
5294
5295	spec->pool_id = CEPH_NOPOOL;
5296	spec->snap_id = CEPH_NOSNAP;
5297	kref_init(&spec->kref);
5298
5299	return spec;
5300}
5301
5302static void rbd_spec_free(struct kref *kref)
5303{
5304	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5305
5306	kfree(spec->pool_name);
5307	kfree(spec->pool_ns);
5308	kfree(spec->image_id);
5309	kfree(spec->image_name);
5310	kfree(spec->snap_name);
5311	kfree(spec);
5312}
5313
5314static void rbd_dev_free(struct rbd_device *rbd_dev)
5315{
5316	WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5317	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5318
5319	ceph_oid_destroy(&rbd_dev->header_oid);
5320	ceph_oloc_destroy(&rbd_dev->header_oloc);
5321	kfree(rbd_dev->config_info);
5322
5323	rbd_put_client(rbd_dev->rbd_client);
5324	rbd_spec_put(rbd_dev->spec);
5325	kfree(rbd_dev->opts);
5326	kfree(rbd_dev);
5327}
5328
5329static void rbd_dev_release(struct device *dev)
5330{
5331	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5332	bool need_put = !!rbd_dev->opts;
5333
5334	if (need_put) {
5335		destroy_workqueue(rbd_dev->task_wq);
5336		ida_free(&rbd_dev_id_ida, rbd_dev->dev_id);
5337	}
5338
5339	rbd_dev_free(rbd_dev);
5340
5341	/*
5342	 * This is racy, but way better than putting module outside of
5343	 * the release callback.  The race window is pretty small, so
5344	 * doing something similar to dm (dm-builtin.c) is overkill.
5345	 */
5346	if (need_put)
5347		module_put(THIS_MODULE);
5348}
5349
5350static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec)
5351{
5352	struct rbd_device *rbd_dev;
5353
5354	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5355	if (!rbd_dev)
5356		return NULL;
5357
5358	spin_lock_init(&rbd_dev->lock);
5359	INIT_LIST_HEAD(&rbd_dev->node);
5360	init_rwsem(&rbd_dev->header_rwsem);
5361
5362	rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5363	ceph_oid_init(&rbd_dev->header_oid);
5364	rbd_dev->header_oloc.pool = spec->pool_id;
5365	if (spec->pool_ns) {
5366		WARN_ON(!*spec->pool_ns);
5367		rbd_dev->header_oloc.pool_ns =
5368		    ceph_find_or_create_string(spec->pool_ns,
5369					       strlen(spec->pool_ns));
5370	}
5371
5372	mutex_init(&rbd_dev->watch_mutex);
5373	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5374	INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5375
5376	init_rwsem(&rbd_dev->lock_rwsem);
5377	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5378	INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5379	INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5380	INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5381	INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5382	spin_lock_init(&rbd_dev->lock_lists_lock);
5383	INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5384	INIT_LIST_HEAD(&rbd_dev->running_list);
5385	init_completion(&rbd_dev->acquire_wait);
5386	init_completion(&rbd_dev->releasing_wait);
5387
5388	spin_lock_init(&rbd_dev->object_map_lock);
5389
5390	rbd_dev->dev.bus = &rbd_bus_type;
5391	rbd_dev->dev.type = &rbd_device_type;
5392	rbd_dev->dev.parent = &rbd_root_dev;
5393	device_initialize(&rbd_dev->dev);
5394
5395	return rbd_dev;
5396}
5397
5398/*
5399 * Create a mapping rbd_dev.
5400 */
5401static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5402					 struct rbd_spec *spec,
5403					 struct rbd_options *opts)
5404{
5405	struct rbd_device *rbd_dev;
5406
5407	rbd_dev = __rbd_dev_create(spec);
5408	if (!rbd_dev)
5409		return NULL;
5410
5411	/* get an id and fill in device name */
5412	rbd_dev->dev_id = ida_alloc_max(&rbd_dev_id_ida,
5413					minor_to_rbd_dev_id(1 << MINORBITS) - 1,
5414					GFP_KERNEL);
5415	if (rbd_dev->dev_id < 0)
5416		goto fail_rbd_dev;
5417
5418	sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5419	rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5420						   rbd_dev->name);
5421	if (!rbd_dev->task_wq)
5422		goto fail_dev_id;
5423
5424	/* we have a ref from do_rbd_add() */
5425	__module_get(THIS_MODULE);
5426
5427	rbd_dev->rbd_client = rbdc;
5428	rbd_dev->spec = spec;
5429	rbd_dev->opts = opts;
5430
5431	dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5432	return rbd_dev;
5433
5434fail_dev_id:
5435	ida_free(&rbd_dev_id_ida, rbd_dev->dev_id);
5436fail_rbd_dev:
5437	rbd_dev_free(rbd_dev);
5438	return NULL;
5439}
5440
5441static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5442{
5443	if (rbd_dev)
5444		put_device(&rbd_dev->dev);
5445}
5446
5447/*
5448 * Get the size and object order for an image snapshot, or if
5449 * snap_id is CEPH_NOSNAP, gets this information for the base
5450 * image.
5451 */
5452static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5453				u8 *order, u64 *snap_size)
5454{
5455	__le64 snapid = cpu_to_le64(snap_id);
5456	int ret;
5457	struct {
5458		u8 order;
5459		__le64 size;
5460	} __attribute__ ((packed)) size_buf = { 0 };
5461
5462	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5463				  &rbd_dev->header_oloc, "get_size",
5464				  &snapid, sizeof(snapid),
5465				  &size_buf, sizeof(size_buf));
5466	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5467	if (ret < 0)
5468		return ret;
5469	if (ret < sizeof (size_buf))
5470		return -ERANGE;
5471
5472	if (order) {
5473		*order = size_buf.order;
5474		dout("  order %u", (unsigned int)*order);
5475	}
5476	*snap_size = le64_to_cpu(size_buf.size);
5477
5478	dout("  snap_id 0x%016llx snap_size = %llu\n",
5479		(unsigned long long)snap_id,
5480		(unsigned long long)*snap_size);
5481
5482	return 0;
5483}
5484
5485static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev,
5486				    char **pobject_prefix)
5487{
5488	size_t size;
5489	void *reply_buf;
5490	char *object_prefix;
5491	int ret;
5492	void *p;
5493
5494	/* Response will be an encoded string, which includes a length */
5495	size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5496	reply_buf = kzalloc(size, GFP_KERNEL);
5497	if (!reply_buf)
5498		return -ENOMEM;
5499
5500	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5501				  &rbd_dev->header_oloc, "get_object_prefix",
5502				  NULL, 0, reply_buf, size);
5503	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5504	if (ret < 0)
5505		goto out;
5506
5507	p = reply_buf;
5508	object_prefix = ceph_extract_encoded_string(&p, p + ret, NULL,
5509						    GFP_NOIO);
5510	if (IS_ERR(object_prefix)) {
5511		ret = PTR_ERR(object_prefix);
5512		goto out;
5513	}
5514	ret = 0;
5515
5516	*pobject_prefix = object_prefix;
5517	dout("  object_prefix = %s\n", object_prefix);
5518out:
5519	kfree(reply_buf);
5520
5521	return ret;
5522}
5523
5524static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5525				     bool read_only, u64 *snap_features)
5526{
5527	struct {
5528		__le64 snap_id;
5529		u8 read_only;
5530	} features_in;
5531	struct {
5532		__le64 features;
5533		__le64 incompat;
5534	} __attribute__ ((packed)) features_buf = { 0 };
5535	u64 unsup;
5536	int ret;
5537
5538	features_in.snap_id = cpu_to_le64(snap_id);
5539	features_in.read_only = read_only;
5540
5541	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5542				  &rbd_dev->header_oloc, "get_features",
5543				  &features_in, sizeof(features_in),
5544				  &features_buf, sizeof(features_buf));
5545	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5546	if (ret < 0)
5547		return ret;
5548	if (ret < sizeof (features_buf))
5549		return -ERANGE;
5550
5551	unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5552	if (unsup) {
5553		rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5554			 unsup);
5555		return -ENXIO;
5556	}
5557
5558	*snap_features = le64_to_cpu(features_buf.features);
5559
5560	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5561		(unsigned long long)snap_id,
5562		(unsigned long long)*snap_features,
5563		(unsigned long long)le64_to_cpu(features_buf.incompat));
5564
5565	return 0;
5566}
5567
5568/*
5569 * These are generic image flags, but since they are used only for
5570 * object map, store them in rbd_dev->object_map_flags.
5571 *
5572 * For the same reason, this function is called only on object map
5573 * (re)load and not on header refresh.
5574 */
5575static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5576{
5577	__le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5578	__le64 flags;
5579	int ret;
5580
5581	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5582				  &rbd_dev->header_oloc, "get_flags",
5583				  &snapid, sizeof(snapid),
5584				  &flags, sizeof(flags));
5585	if (ret < 0)
5586		return ret;
5587	if (ret < sizeof(flags))
5588		return -EBADMSG;
5589
5590	rbd_dev->object_map_flags = le64_to_cpu(flags);
5591	return 0;
5592}
5593
5594struct parent_image_info {
5595	u64		pool_id;
5596	const char	*pool_ns;
5597	const char	*image_id;
5598	u64		snap_id;
5599
5600	bool		has_overlap;
5601	u64		overlap;
5602};
5603
5604static void rbd_parent_info_cleanup(struct parent_image_info *pii)
5605{
5606	kfree(pii->pool_ns);
5607	kfree(pii->image_id);
5608
5609	memset(pii, 0, sizeof(*pii));
5610}
5611
5612/*
5613 * The caller is responsible for @pii.
5614 */
5615static int decode_parent_image_spec(void **p, void *end,
5616				    struct parent_image_info *pii)
5617{
5618	u8 struct_v;
5619	u32 struct_len;
5620	int ret;
5621
5622	ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5623				  &struct_v, &struct_len);
5624	if (ret)
5625		return ret;
5626
5627	ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5628	pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5629	if (IS_ERR(pii->pool_ns)) {
5630		ret = PTR_ERR(pii->pool_ns);
5631		pii->pool_ns = NULL;
5632		return ret;
5633	}
5634	pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5635	if (IS_ERR(pii->image_id)) {
5636		ret = PTR_ERR(pii->image_id);
5637		pii->image_id = NULL;
5638		return ret;
5639	}
5640	ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5641	return 0;
5642
5643e_inval:
5644	return -EINVAL;
5645}
5646
5647static int __get_parent_info(struct rbd_device *rbd_dev,
5648			     struct page *req_page,
5649			     struct page *reply_page,
5650			     struct parent_image_info *pii)
5651{
5652	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5653	size_t reply_len = PAGE_SIZE;
5654	void *p, *end;
5655	int ret;
5656
5657	ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5658			     "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5659			     req_page, sizeof(u64), &reply_page, &reply_len);
5660	if (ret)
5661		return ret == -EOPNOTSUPP ? 1 : ret;
5662
5663	p = page_address(reply_page);
5664	end = p + reply_len;
5665	ret = decode_parent_image_spec(&p, end, pii);
5666	if (ret)
5667		return ret;
5668
5669	ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5670			     "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5671			     req_page, sizeof(u64), &reply_page, &reply_len);
5672	if (ret)
5673		return ret;
5674
5675	p = page_address(reply_page);
5676	end = p + reply_len;
5677	ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5678	if (pii->has_overlap)
5679		ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5680
5681	dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5682	     __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
5683	     pii->has_overlap, pii->overlap);
5684	return 0;
5685
5686e_inval:
5687	return -EINVAL;
5688}
5689
5690/*
5691 * The caller is responsible for @pii.
5692 */
5693static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5694				    struct page *req_page,
5695				    struct page *reply_page,
5696				    struct parent_image_info *pii)
5697{
5698	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5699	size_t reply_len = PAGE_SIZE;
5700	void *p, *end;
5701	int ret;
5702
5703	ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5704			     "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5705			     req_page, sizeof(u64), &reply_page, &reply_len);
5706	if (ret)
5707		return ret;
5708
5709	p = page_address(reply_page);
5710	end = p + reply_len;
5711	ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5712	pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5713	if (IS_ERR(pii->image_id)) {
5714		ret = PTR_ERR(pii->image_id);
5715		pii->image_id = NULL;
5716		return ret;
5717	}
5718	ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5719	pii->has_overlap = true;
5720	ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5721
5722	dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5723	     __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
5724	     pii->has_overlap, pii->overlap);
5725	return 0;
5726
5727e_inval:
5728	return -EINVAL;
5729}
5730
5731static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
5732				  struct parent_image_info *pii)
5733{
5734	struct page *req_page, *reply_page;
5735	void *p;
5736	int ret;
5737
5738	req_page = alloc_page(GFP_KERNEL);
5739	if (!req_page)
5740		return -ENOMEM;
5741
5742	reply_page = alloc_page(GFP_KERNEL);
5743	if (!reply_page) {
5744		__free_page(req_page);
5745		return -ENOMEM;
5746	}
5747
5748	p = page_address(req_page);
5749	ceph_encode_64(&p, rbd_dev->spec->snap_id);
5750	ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5751	if (ret > 0)
5752		ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5753					       pii);
5754
5755	__free_page(req_page);
5756	__free_page(reply_page);
5757	return ret;
5758}
5759
5760static int rbd_dev_setup_parent(struct rbd_device *rbd_dev)
5761{
5762	struct rbd_spec *parent_spec;
5763	struct parent_image_info pii = { 0 };
5764	int ret;
5765
5766	parent_spec = rbd_spec_alloc();
5767	if (!parent_spec)
5768		return -ENOMEM;
5769
5770	ret = rbd_dev_v2_parent_info(rbd_dev, &pii);
5771	if (ret)
5772		goto out_err;
5773
5774	if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap)
5775		goto out;	/* No parent?  No problem. */
5776
5777	/* The ceph file layout needs to fit pool id in 32 bits */
5778
5779	ret = -EIO;
5780	if (pii.pool_id > (u64)U32_MAX) {
5781		rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5782			(unsigned long long)pii.pool_id, U32_MAX);
5783		goto out_err;
5784	}
5785
5786	/*
5787	 * The parent won't change except when the clone is flattened,
5788	 * so we only need to record the parent image spec once.
5789	 */
5790	parent_spec->pool_id = pii.pool_id;
5791	if (pii.pool_ns && *pii.pool_ns) {
5792		parent_spec->pool_ns = pii.pool_ns;
5793		pii.pool_ns = NULL;
5794	}
5795	parent_spec->image_id = pii.image_id;
5796	pii.image_id = NULL;
5797	parent_spec->snap_id = pii.snap_id;
5798
5799	rbd_assert(!rbd_dev->parent_spec);
5800	rbd_dev->parent_spec = parent_spec;
5801	parent_spec = NULL;	/* rbd_dev now owns this */
5802
5803	/*
5804	 * Record the parent overlap.  If it's zero, issue a warning as
5805	 * we will proceed as if there is no parent.
5806	 */
5807	if (!pii.overlap)
5808		rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5809	rbd_dev->parent_overlap = pii.overlap;
5810
5811out:
5812	ret = 0;
5813out_err:
5814	rbd_parent_info_cleanup(&pii);
5815	rbd_spec_put(parent_spec);
5816	return ret;
5817}
5818
5819static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev,
5820				    u64 *stripe_unit, u64 *stripe_count)
5821{
5822	struct {
5823		__le64 stripe_unit;
5824		__le64 stripe_count;
5825	} __attribute__ ((packed)) striping_info_buf = { 0 };
5826	size_t size = sizeof (striping_info_buf);
5827	int ret;
5828
5829	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5830				&rbd_dev->header_oloc, "get_stripe_unit_count",
5831				NULL, 0, &striping_info_buf, size);
5832	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5833	if (ret < 0)
5834		return ret;
5835	if (ret < size)
5836		return -ERANGE;
5837
5838	*stripe_unit = le64_to_cpu(striping_info_buf.stripe_unit);
5839	*stripe_count = le64_to_cpu(striping_info_buf.stripe_count);
5840	dout("  stripe_unit = %llu stripe_count = %llu\n", *stripe_unit,
5841	     *stripe_count);
5842
5843	return 0;
5844}
5845
5846static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev, s64 *data_pool_id)
5847{
5848	__le64 data_pool_buf;
5849	int ret;
5850
5851	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5852				  &rbd_dev->header_oloc, "get_data_pool",
5853				  NULL, 0, &data_pool_buf,
5854				  sizeof(data_pool_buf));
5855	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5856	if (ret < 0)
5857		return ret;
5858	if (ret < sizeof(data_pool_buf))
5859		return -EBADMSG;
5860
5861	*data_pool_id = le64_to_cpu(data_pool_buf);
5862	dout("  data_pool_id = %lld\n", *data_pool_id);
5863	WARN_ON(*data_pool_id == CEPH_NOPOOL);
5864
5865	return 0;
5866}
5867
5868static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5869{
5870	CEPH_DEFINE_OID_ONSTACK(oid);
5871	size_t image_id_size;
5872	char *image_id;
5873	void *p;
5874	void *end;
5875	size_t size;
5876	void *reply_buf = NULL;
5877	size_t len = 0;
5878	char *image_name = NULL;
5879	int ret;
5880
5881	rbd_assert(!rbd_dev->spec->image_name);
5882
5883	len = strlen(rbd_dev->spec->image_id);
5884	image_id_size = sizeof (__le32) + len;
5885	image_id = kmalloc(image_id_size, GFP_KERNEL);
5886	if (!image_id)
5887		return NULL;
5888
5889	p = image_id;
5890	end = image_id + image_id_size;
5891	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5892
5893	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5894	reply_buf = kmalloc(size, GFP_KERNEL);
5895	if (!reply_buf)
5896		goto out;
5897
5898	ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5899	ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5900				  "dir_get_name", image_id, image_id_size,
5901				  reply_buf, size);
5902	if (ret < 0)
5903		goto out;
5904	p = reply_buf;
5905	end = reply_buf + ret;
5906
5907	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5908	if (IS_ERR(image_name))
5909		image_name = NULL;
5910	else
5911		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5912out:
5913	kfree(reply_buf);
5914	kfree(image_id);
5915
5916	return image_name;
5917}
5918
5919static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5920{
5921	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5922	const char *snap_name;
5923	u32 which = 0;
5924
5925	/* Skip over names until we find the one we are looking for */
5926
5927	snap_name = rbd_dev->header.snap_names;
5928	while (which < snapc->num_snaps) {
5929		if (!strcmp(name, snap_name))
5930			return snapc->snaps[which];
5931		snap_name += strlen(snap_name) + 1;
5932		which++;
5933	}
5934	return CEPH_NOSNAP;
5935}
5936
5937static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5938{
5939	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5940	u32 which;
5941	bool found = false;
5942	u64 snap_id;
5943
5944	for (which = 0; !found && which < snapc->num_snaps; which++) {
5945		const char *snap_name;
5946
5947		snap_id = snapc->snaps[which];
5948		snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5949		if (IS_ERR(snap_name)) {
5950			/* ignore no-longer existing snapshots */
5951			if (PTR_ERR(snap_name) == -ENOENT)
5952				continue;
5953			else
5954				break;
5955		}
5956		found = !strcmp(name, snap_name);
5957		kfree(snap_name);
5958	}
5959	return found ? snap_id : CEPH_NOSNAP;
5960}
5961
5962/*
5963 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5964 * no snapshot by that name is found, or if an error occurs.
5965 */
5966static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5967{
5968	if (rbd_dev->image_format == 1)
5969		return rbd_v1_snap_id_by_name(rbd_dev, name);
5970
5971	return rbd_v2_snap_id_by_name(rbd_dev, name);
5972}
5973
5974/*
5975 * An image being mapped will have everything but the snap id.
5976 */
5977static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5978{
5979	struct rbd_spec *spec = rbd_dev->spec;
5980
5981	rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5982	rbd_assert(spec->image_id && spec->image_name);
5983	rbd_assert(spec->snap_name);
5984
5985	if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5986		u64 snap_id;
5987
5988		snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5989		if (snap_id == CEPH_NOSNAP)
5990			return -ENOENT;
5991
5992		spec->snap_id = snap_id;
5993	} else {
5994		spec->snap_id = CEPH_NOSNAP;
5995	}
5996
5997	return 0;
5998}
5999
6000/*
6001 * A parent image will have all ids but none of the names.
6002 *
6003 * All names in an rbd spec are dynamically allocated.  It's OK if we
6004 * can't figure out the name for an image id.
6005 */
6006static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6007{
6008	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6009	struct rbd_spec *spec = rbd_dev->spec;
6010	const char *pool_name;
6011	const char *image_name;
6012	const char *snap_name;
6013	int ret;
6014
6015	rbd_assert(spec->pool_id != CEPH_NOPOOL);
6016	rbd_assert(spec->image_id);
6017	rbd_assert(spec->snap_id != CEPH_NOSNAP);
6018
6019	/* Get the pool name; we have to make our own copy of this */
6020
6021	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6022	if (!pool_name) {
6023		rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6024		return -EIO;
6025	}
6026	pool_name = kstrdup(pool_name, GFP_KERNEL);
6027	if (!pool_name)
6028		return -ENOMEM;
6029
6030	/* Fetch the image name; tolerate failure here */
6031
6032	image_name = rbd_dev_image_name(rbd_dev);
6033	if (!image_name)
6034		rbd_warn(rbd_dev, "unable to get image name");
6035
6036	/* Fetch the snapshot name */
6037
6038	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6039	if (IS_ERR(snap_name)) {
6040		ret = PTR_ERR(snap_name);
6041		goto out_err;
6042	}
6043
6044	spec->pool_name = pool_name;
6045	spec->image_name = image_name;
6046	spec->snap_name = snap_name;
6047
6048	return 0;
6049
6050out_err:
6051	kfree(image_name);
6052	kfree(pool_name);
6053	return ret;
6054}
6055
6056static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev,
6057				   struct ceph_snap_context **psnapc)
6058{
6059	size_t size;
6060	int ret;
6061	void *reply_buf;
6062	void *p;
6063	void *end;
6064	u64 seq;
6065	u32 snap_count;
6066	struct ceph_snap_context *snapc;
6067	u32 i;
6068
6069	/*
6070	 * We'll need room for the seq value (maximum snapshot id),
6071	 * snapshot count, and array of that many snapshot ids.
6072	 * For now we have a fixed upper limit on the number we're
6073	 * prepared to receive.
6074	 */
6075	size = sizeof (__le64) + sizeof (__le32) +
6076			RBD_MAX_SNAP_COUNT * sizeof (__le64);
6077	reply_buf = kzalloc(size, GFP_KERNEL);
6078	if (!reply_buf)
6079		return -ENOMEM;
6080
6081	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6082				  &rbd_dev->header_oloc, "get_snapcontext",
6083				  NULL, 0, reply_buf, size);
6084	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6085	if (ret < 0)
6086		goto out;
6087
6088	p = reply_buf;
6089	end = reply_buf + ret;
6090	ret = -ERANGE;
6091	ceph_decode_64_safe(&p, end, seq, out);
6092	ceph_decode_32_safe(&p, end, snap_count, out);
6093
6094	/*
6095	 * Make sure the reported number of snapshot ids wouldn't go
6096	 * beyond the end of our buffer.  But before checking that,
6097	 * make sure the computed size of the snapshot context we
6098	 * allocate is representable in a size_t.
6099	 */
6100	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6101				 / sizeof (u64)) {
6102		ret = -EINVAL;
6103		goto out;
6104	}
6105	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6106		goto out;
6107	ret = 0;
6108
6109	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6110	if (!snapc) {
6111		ret = -ENOMEM;
6112		goto out;
6113	}
6114	snapc->seq = seq;
6115	for (i = 0; i < snap_count; i++)
6116		snapc->snaps[i] = ceph_decode_64(&p);
6117
6118	*psnapc = snapc;
6119	dout("  snap context seq = %llu, snap_count = %u\n",
6120		(unsigned long long)seq, (unsigned int)snap_count);
6121out:
6122	kfree(reply_buf);
6123
6124	return ret;
6125}
6126
6127static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6128					u64 snap_id)
6129{
6130	size_t size;
6131	void *reply_buf;
6132	__le64 snapid;
6133	int ret;
6134	void *p;
6135	void *end;
6136	char *snap_name;
6137
6138	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6139	reply_buf = kmalloc(size, GFP_KERNEL);
6140	if (!reply_buf)
6141		return ERR_PTR(-ENOMEM);
6142
6143	snapid = cpu_to_le64(snap_id);
6144	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6145				  &rbd_dev->header_oloc, "get_snapshot_name",
6146				  &snapid, sizeof(snapid), reply_buf, size);
6147	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6148	if (ret < 0) {
6149		snap_name = ERR_PTR(ret);
6150		goto out;
6151	}
6152
6153	p = reply_buf;
6154	end = reply_buf + ret;
6155	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6156	if (IS_ERR(snap_name))
6157		goto out;
6158
6159	dout("  snap_id 0x%016llx snap_name = %s\n",
6160		(unsigned long long)snap_id, snap_name);
6161out:
6162	kfree(reply_buf);
6163
6164	return snap_name;
6165}
6166
6167static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev,
6168				  struct rbd_image_header *header,
6169				  bool first_time)
6170{
6171	int ret;
6172
6173	ret = _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
6174				    first_time ? &header->obj_order : NULL,
6175				    &header->image_size);
6176	if (ret)
6177		return ret;
6178
6179	if (first_time) {
6180		ret = rbd_dev_v2_header_onetime(rbd_dev, header);
6181		if (ret)
6182			return ret;
6183	}
6184
6185	ret = rbd_dev_v2_snap_context(rbd_dev, &header->snapc);
6186	if (ret)
6187		return ret;
6188
6189	return 0;
6190}
6191
6192static int rbd_dev_header_info(struct rbd_device *rbd_dev,
6193			       struct rbd_image_header *header,
6194			       bool first_time)
6195{
6196	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6197	rbd_assert(!header->object_prefix && !header->snapc);
6198
6199	if (rbd_dev->image_format == 1)
6200		return rbd_dev_v1_header_info(rbd_dev, header, first_time);
6201
6202	return rbd_dev_v2_header_info(rbd_dev, header, first_time);
6203}
6204
6205/*
6206 * Skips over white space at *buf, and updates *buf to point to the
6207 * first found non-space character (if any). Returns the length of
6208 * the token (string of non-white space characters) found.  Note
6209 * that *buf must be terminated with '\0'.
6210 */
6211static inline size_t next_token(const char **buf)
6212{
6213        /*
6214        * These are the characters that produce nonzero for
6215        * isspace() in the "C" and "POSIX" locales.
6216        */
6217	static const char spaces[] = " \f\n\r\t\v";
6218
6219        *buf += strspn(*buf, spaces);	/* Find start of token */
6220
6221	return strcspn(*buf, spaces);   /* Return token length */
6222}
6223
6224/*
6225 * Finds the next token in *buf, dynamically allocates a buffer big
6226 * enough to hold a copy of it, and copies the token into the new
6227 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6228 * that a duplicate buffer is created even for a zero-length token.
6229 *
6230 * Returns a pointer to the newly-allocated duplicate, or a null
6231 * pointer if memory for the duplicate was not available.  If
6232 * the lenp argument is a non-null pointer, the length of the token
6233 * (not including the '\0') is returned in *lenp.
6234 *
6235 * If successful, the *buf pointer will be updated to point beyond
6236 * the end of the found token.
6237 *
6238 * Note: uses GFP_KERNEL for allocation.
6239 */
6240static inline char *dup_token(const char **buf, size_t *lenp)
6241{
6242	char *dup;
6243	size_t len;
6244
6245	len = next_token(buf);
6246	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6247	if (!dup)
6248		return NULL;
6249	*(dup + len) = '\0';
6250	*buf += len;
6251
6252	if (lenp)
6253		*lenp = len;
6254
6255	return dup;
6256}
6257
6258static int rbd_parse_param(struct fs_parameter *param,
6259			    struct rbd_parse_opts_ctx *pctx)
6260{
6261	struct rbd_options *opt = pctx->opts;
6262	struct fs_parse_result result;
6263	struct p_log log = {.prefix = "rbd"};
6264	int token, ret;
6265
6266	ret = ceph_parse_param(param, pctx->copts, NULL);
6267	if (ret != -ENOPARAM)
6268		return ret;
6269
6270	token = __fs_parse(&log, rbd_parameters, param, &result);
6271	dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6272	if (token < 0) {
6273		if (token == -ENOPARAM)
6274			return inval_plog(&log, "Unknown parameter '%s'",
6275					  param->key);
6276		return token;
6277	}
6278
6279	switch (token) {
6280	case Opt_queue_depth:
6281		if (result.uint_32 < 1)
6282			goto out_of_range;
6283		opt->queue_depth = result.uint_32;
6284		break;
6285	case Opt_alloc_size:
6286		if (result.uint_32 < SECTOR_SIZE)
6287			goto out_of_range;
6288		if (!is_power_of_2(result.uint_32))
6289			return inval_plog(&log, "alloc_size must be a power of 2");
6290		opt->alloc_size = result.uint_32;
6291		break;
6292	case Opt_lock_timeout:
6293		/* 0 is "wait forever" (i.e. infinite timeout) */
6294		if (result.uint_32 > INT_MAX / 1000)
6295			goto out_of_range;
6296		opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6297		break;
6298	case Opt_pool_ns:
6299		kfree(pctx->spec->pool_ns);
6300		pctx->spec->pool_ns = param->string;
6301		param->string = NULL;
6302		break;
6303	case Opt_compression_hint:
6304		switch (result.uint_32) {
6305		case Opt_compression_hint_none:
6306			opt->alloc_hint_flags &=
6307			    ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6308			      CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6309			break;
6310		case Opt_compression_hint_compressible:
6311			opt->alloc_hint_flags |=
6312			    CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6313			opt->alloc_hint_flags &=
6314			    ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6315			break;
6316		case Opt_compression_hint_incompressible:
6317			opt->alloc_hint_flags |=
6318			    CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6319			opt->alloc_hint_flags &=
6320			    ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6321			break;
6322		default:
6323			BUG();
6324		}
6325		break;
6326	case Opt_read_only:
6327		opt->read_only = true;
6328		break;
6329	case Opt_read_write:
6330		opt->read_only = false;
6331		break;
6332	case Opt_lock_on_read:
6333		opt->lock_on_read = true;
6334		break;
6335	case Opt_exclusive:
6336		opt->exclusive = true;
6337		break;
6338	case Opt_notrim:
6339		opt->trim = false;
6340		break;
6341	default:
6342		BUG();
6343	}
6344
6345	return 0;
6346
6347out_of_range:
6348	return inval_plog(&log, "%s out of range", param->key);
6349}
6350
6351/*
6352 * This duplicates most of generic_parse_monolithic(), untying it from
6353 * fs_context and skipping standard superblock and security options.
6354 */
6355static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6356{
6357	char *key;
6358	int ret = 0;
6359
6360	dout("%s '%s'\n", __func__, options);
6361	while ((key = strsep(&options, ",")) != NULL) {
6362		if (*key) {
6363			struct fs_parameter param = {
6364				.key	= key,
6365				.type	= fs_value_is_flag,
6366			};
6367			char *value = strchr(key, '=');
6368			size_t v_len = 0;
6369
6370			if (value) {
6371				if (value == key)
6372					continue;
6373				*value++ = 0;
6374				v_len = strlen(value);
6375				param.string = kmemdup_nul(value, v_len,
6376							   GFP_KERNEL);
6377				if (!param.string)
6378					return -ENOMEM;
6379				param.type = fs_value_is_string;
6380			}
6381			param.size = v_len;
6382
6383			ret = rbd_parse_param(&param, pctx);
6384			kfree(param.string);
6385			if (ret)
6386				break;
6387		}
6388	}
6389
6390	return ret;
6391}
6392
6393/*
6394 * Parse the options provided for an "rbd add" (i.e., rbd image
6395 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6396 * and the data written is passed here via a NUL-terminated buffer.
6397 * Returns 0 if successful or an error code otherwise.
6398 *
6399 * The information extracted from these options is recorded in
6400 * the other parameters which return dynamically-allocated
6401 * structures:
6402 *  ceph_opts
6403 *      The address of a pointer that will refer to a ceph options
6404 *      structure.  Caller must release the returned pointer using
6405 *      ceph_destroy_options() when it is no longer needed.
6406 *  rbd_opts
6407 *	Address of an rbd options pointer.  Fully initialized by
6408 *	this function; caller must release with kfree().
6409 *  spec
6410 *	Address of an rbd image specification pointer.  Fully
6411 *	initialized by this function based on parsed options.
6412 *	Caller must release with rbd_spec_put().
6413 *
6414 * The options passed take this form:
6415 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6416 * where:
6417 *  <mon_addrs>
6418 *      A comma-separated list of one or more monitor addresses.
6419 *      A monitor address is an ip address, optionally followed
6420 *      by a port number (separated by a colon).
6421 *        I.e.:  ip1[:port1][,ip2[:port2]...]
6422 *  <options>
6423 *      A comma-separated list of ceph and/or rbd options.
6424 *  <pool_name>
6425 *      The name of the rados pool containing the rbd image.
6426 *  <image_name>
6427 *      The name of the image in that pool to map.
6428 *  <snap_id>
6429 *      An optional snapshot id.  If provided, the mapping will
6430 *      present data from the image at the time that snapshot was
6431 *      created.  The image head is used if no snapshot id is
6432 *      provided.  Snapshot mappings are always read-only.
6433 */
6434static int rbd_add_parse_args(const char *buf,
6435				struct ceph_options **ceph_opts,
6436				struct rbd_options **opts,
6437				struct rbd_spec **rbd_spec)
6438{
6439	size_t len;
6440	char *options;
6441	const char *mon_addrs;
6442	char *snap_name;
6443	size_t mon_addrs_size;
6444	struct rbd_parse_opts_ctx pctx = { 0 };
6445	int ret;
6446
6447	/* The first four tokens are required */
6448
6449	len = next_token(&buf);
6450	if (!len) {
6451		rbd_warn(NULL, "no monitor address(es) provided");
6452		return -EINVAL;
6453	}
6454	mon_addrs = buf;
6455	mon_addrs_size = len;
6456	buf += len;
6457
6458	ret = -EINVAL;
6459	options = dup_token(&buf, NULL);
6460	if (!options)
6461		return -ENOMEM;
6462	if (!*options) {
6463		rbd_warn(NULL, "no options provided");
6464		goto out_err;
6465	}
6466
6467	pctx.spec = rbd_spec_alloc();
6468	if (!pctx.spec)
6469		goto out_mem;
6470
6471	pctx.spec->pool_name = dup_token(&buf, NULL);
6472	if (!pctx.spec->pool_name)
6473		goto out_mem;
6474	if (!*pctx.spec->pool_name) {
6475		rbd_warn(NULL, "no pool name provided");
6476		goto out_err;
6477	}
6478
6479	pctx.spec->image_name = dup_token(&buf, NULL);
6480	if (!pctx.spec->image_name)
6481		goto out_mem;
6482	if (!*pctx.spec->image_name) {
6483		rbd_warn(NULL, "no image name provided");
6484		goto out_err;
6485	}
6486
6487	/*
6488	 * Snapshot name is optional; default is to use "-"
6489	 * (indicating the head/no snapshot).
6490	 */
6491	len = next_token(&buf);
6492	if (!len) {
6493		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6494		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6495	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
6496		ret = -ENAMETOOLONG;
6497		goto out_err;
6498	}
6499	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6500	if (!snap_name)
6501		goto out_mem;
6502	*(snap_name + len) = '\0';
6503	pctx.spec->snap_name = snap_name;
6504
6505	pctx.copts = ceph_alloc_options();
6506	if (!pctx.copts)
6507		goto out_mem;
6508
6509	/* Initialize all rbd options to the defaults */
6510
6511	pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6512	if (!pctx.opts)
6513		goto out_mem;
6514
6515	pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6516	pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6517	pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6518	pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6519	pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6520	pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6521	pctx.opts->trim = RBD_TRIM_DEFAULT;
6522
6523	ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL,
6524				 ',');
6525	if (ret)
6526		goto out_err;
6527
6528	ret = rbd_parse_options(options, &pctx);
6529	if (ret)
6530		goto out_err;
6531
6532	*ceph_opts = pctx.copts;
6533	*opts = pctx.opts;
6534	*rbd_spec = pctx.spec;
6535	kfree(options);
6536	return 0;
6537
6538out_mem:
6539	ret = -ENOMEM;
6540out_err:
6541	kfree(pctx.opts);
6542	ceph_destroy_options(pctx.copts);
6543	rbd_spec_put(pctx.spec);
6544	kfree(options);
6545	return ret;
6546}
6547
6548static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6549{
6550	down_write(&rbd_dev->lock_rwsem);
6551	if (__rbd_is_lock_owner(rbd_dev))
6552		__rbd_release_lock(rbd_dev);
6553	up_write(&rbd_dev->lock_rwsem);
6554}
6555
6556/*
6557 * If the wait is interrupted, an error is returned even if the lock
6558 * was successfully acquired.  rbd_dev_image_unlock() will release it
6559 * if needed.
6560 */
6561static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6562{
6563	long ret;
6564
6565	if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6566		if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6567			return 0;
6568
6569		rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6570		return -EINVAL;
6571	}
6572
6573	if (rbd_is_ro(rbd_dev))
6574		return 0;
6575
6576	rbd_assert(!rbd_is_lock_owner(rbd_dev));
6577	queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6578	ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6579			    ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6580	if (ret > 0) {
6581		ret = rbd_dev->acquire_err;
6582	} else {
6583		cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6584		if (!ret)
6585			ret = -ETIMEDOUT;
6586
6587		rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret);
6588	}
6589	if (ret)
6590		return ret;
6591
6592	/*
6593	 * The lock may have been released by now, unless automatic lock
6594	 * transitions are disabled.
6595	 */
6596	rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6597	return 0;
6598}
6599
6600/*
6601 * An rbd format 2 image has a unique identifier, distinct from the
6602 * name given to it by the user.  Internally, that identifier is
6603 * what's used to specify the names of objects related to the image.
6604 *
6605 * A special "rbd id" object is used to map an rbd image name to its
6606 * id.  If that object doesn't exist, then there is no v2 rbd image
6607 * with the supplied name.
6608 *
6609 * This function will record the given rbd_dev's image_id field if
6610 * it can be determined, and in that case will return 0.  If any
6611 * errors occur a negative errno will be returned and the rbd_dev's
6612 * image_id field will be unchanged (and should be NULL).
6613 */
6614static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6615{
6616	int ret;
6617	size_t size;
6618	CEPH_DEFINE_OID_ONSTACK(oid);
6619	void *response;
6620	char *image_id;
6621
6622	/*
6623	 * When probing a parent image, the image id is already
6624	 * known (and the image name likely is not).  There's no
6625	 * need to fetch the image id again in this case.  We
6626	 * do still need to set the image format though.
6627	 */
6628	if (rbd_dev->spec->image_id) {
6629		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6630
6631		return 0;
6632	}
6633
6634	/*
6635	 * First, see if the format 2 image id file exists, and if
6636	 * so, get the image's persistent id from it.
6637	 */
6638	ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6639			       rbd_dev->spec->image_name);
6640	if (ret)
6641		return ret;
6642
6643	dout("rbd id object name is %s\n", oid.name);
6644
6645	/* Response will be an encoded string, which includes a length */
6646	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6647	response = kzalloc(size, GFP_NOIO);
6648	if (!response) {
6649		ret = -ENOMEM;
6650		goto out;
6651	}
6652
6653	/* If it doesn't exist we'll assume it's a format 1 image */
6654
6655	ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6656				  "get_id", NULL, 0,
6657				  response, size);
6658	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6659	if (ret == -ENOENT) {
6660		image_id = kstrdup("", GFP_KERNEL);
6661		ret = image_id ? 0 : -ENOMEM;
6662		if (!ret)
6663			rbd_dev->image_format = 1;
6664	} else if (ret >= 0) {
6665		void *p = response;
6666
6667		image_id = ceph_extract_encoded_string(&p, p + ret,
6668						NULL, GFP_NOIO);
6669		ret = PTR_ERR_OR_ZERO(image_id);
6670		if (!ret)
6671			rbd_dev->image_format = 2;
6672	}
6673
6674	if (!ret) {
6675		rbd_dev->spec->image_id = image_id;
6676		dout("image_id is %s\n", image_id);
6677	}
6678out:
6679	kfree(response);
6680	ceph_oid_destroy(&oid);
6681	return ret;
6682}
6683
6684/*
6685 * Undo whatever state changes are made by v1 or v2 header info
6686 * call.
6687 */
6688static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6689{
6690	rbd_dev_parent_put(rbd_dev);
6691	rbd_object_map_free(rbd_dev);
6692	rbd_dev_mapping_clear(rbd_dev);
6693
6694	/* Free dynamic fields from the header, then zero it out */
6695
6696	rbd_image_header_cleanup(&rbd_dev->header);
6697}
6698
6699static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev,
6700				     struct rbd_image_header *header)
6701{
6702	int ret;
6703
6704	ret = rbd_dev_v2_object_prefix(rbd_dev, &header->object_prefix);
6705	if (ret)
6706		return ret;
6707
6708	/*
6709	 * Get the and check features for the image.  Currently the
6710	 * features are assumed to never change.
6711	 */
6712	ret = _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
6713					rbd_is_ro(rbd_dev), &header->features);
6714	if (ret)
6715		return ret;
6716
6717	/* If the image supports fancy striping, get its parameters */
6718
6719	if (header->features & RBD_FEATURE_STRIPINGV2) {
6720		ret = rbd_dev_v2_striping_info(rbd_dev, &header->stripe_unit,
6721					       &header->stripe_count);
6722		if (ret)
6723			return ret;
6724	}
6725
6726	if (header->features & RBD_FEATURE_DATA_POOL) {
6727		ret = rbd_dev_v2_data_pool(rbd_dev, &header->data_pool_id);
6728		if (ret)
6729			return ret;
6730	}
6731
6732	return 0;
6733}
6734
6735/*
6736 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6737 * rbd_dev_image_probe() recursion depth, which means it's also the
6738 * length of the already discovered part of the parent chain.
6739 */
6740static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6741{
6742	struct rbd_device *parent = NULL;
6743	int ret;
6744
6745	if (!rbd_dev->parent_spec)
6746		return 0;
6747
6748	if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6749		pr_info("parent chain is too long (%d)\n", depth);
6750		ret = -EINVAL;
6751		goto out_err;
6752	}
6753
6754	parent = __rbd_dev_create(rbd_dev->parent_spec);
6755	if (!parent) {
6756		ret = -ENOMEM;
6757		goto out_err;
6758	}
6759
6760	/*
6761	 * Images related by parent/child relationships always share
6762	 * rbd_client and spec/parent_spec, so bump their refcounts.
6763	 */
6764	parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
6765	parent->spec = rbd_spec_get(rbd_dev->parent_spec);
6766
6767	__set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6768
6769	ret = rbd_dev_image_probe(parent, depth);
6770	if (ret < 0)
6771		goto out_err;
6772
6773	rbd_dev->parent = parent;
6774	atomic_set(&rbd_dev->parent_ref, 1);
6775	return 0;
6776
6777out_err:
6778	rbd_dev_unparent(rbd_dev);
6779	rbd_dev_destroy(parent);
6780	return ret;
6781}
6782
6783static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6784{
6785	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6786	rbd_free_disk(rbd_dev);
6787	if (!single_major)
6788		unregister_blkdev(rbd_dev->major, rbd_dev->name);
6789}
6790
6791/*
6792 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6793 * upon return.
6794 */
6795static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6796{
6797	int ret;
6798
6799	/* Record our major and minor device numbers. */
6800
6801	if (!single_major) {
6802		ret = register_blkdev(0, rbd_dev->name);
6803		if (ret < 0)
6804			goto err_out_unlock;
6805
6806		rbd_dev->major = ret;
6807		rbd_dev->minor = 0;
6808	} else {
6809		rbd_dev->major = rbd_major;
6810		rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6811	}
6812
6813	/* Set up the blkdev mapping. */
6814
6815	ret = rbd_init_disk(rbd_dev);
6816	if (ret)
6817		goto err_out_blkdev;
6818
6819	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6820	set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6821
6822	ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6823	if (ret)
6824		goto err_out_disk;
6825
6826	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6827	up_write(&rbd_dev->header_rwsem);
6828	return 0;
6829
6830err_out_disk:
6831	rbd_free_disk(rbd_dev);
6832err_out_blkdev:
6833	if (!single_major)
6834		unregister_blkdev(rbd_dev->major, rbd_dev->name);
6835err_out_unlock:
6836	up_write(&rbd_dev->header_rwsem);
6837	return ret;
6838}
6839
6840static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6841{
6842	struct rbd_spec *spec = rbd_dev->spec;
6843	int ret;
6844
6845	/* Record the header object name for this rbd image. */
6846
6847	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6848	if (rbd_dev->image_format == 1)
6849		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6850				       spec->image_name, RBD_SUFFIX);
6851	else
6852		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6853				       RBD_HEADER_PREFIX, spec->image_id);
6854
6855	return ret;
6856}
6857
6858static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6859{
6860	if (!is_snap) {
6861		pr_info("image %s/%s%s%s does not exist\n",
6862			rbd_dev->spec->pool_name,
6863			rbd_dev->spec->pool_ns ?: "",
6864			rbd_dev->spec->pool_ns ? "/" : "",
6865			rbd_dev->spec->image_name);
6866	} else {
6867		pr_info("snap %s/%s%s%s@%s does not exist\n",
6868			rbd_dev->spec->pool_name,
6869			rbd_dev->spec->pool_ns ?: "",
6870			rbd_dev->spec->pool_ns ? "/" : "",
6871			rbd_dev->spec->image_name,
6872			rbd_dev->spec->snap_name);
6873	}
6874}
6875
6876static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6877{
6878	if (!rbd_is_ro(rbd_dev))
6879		rbd_unregister_watch(rbd_dev);
6880
6881	rbd_dev_unprobe(rbd_dev);
6882	rbd_dev->image_format = 0;
6883	kfree(rbd_dev->spec->image_id);
6884	rbd_dev->spec->image_id = NULL;
6885}
6886
6887/*
6888 * Probe for the existence of the header object for the given rbd
6889 * device.  If this image is the one being mapped (i.e., not a
6890 * parent), initiate a watch on its header object before using that
6891 * object to get detailed information about the rbd image.
6892 *
6893 * On success, returns with header_rwsem held for write if called
6894 * with @depth == 0.
6895 */
6896static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6897{
6898	bool need_watch = !rbd_is_ro(rbd_dev);
6899	int ret;
6900
6901	/*
6902	 * Get the id from the image id object.  Unless there's an
6903	 * error, rbd_dev->spec->image_id will be filled in with
6904	 * a dynamically-allocated string, and rbd_dev->image_format
6905	 * will be set to either 1 or 2.
6906	 */
6907	ret = rbd_dev_image_id(rbd_dev);
6908	if (ret)
6909		return ret;
6910
6911	ret = rbd_dev_header_name(rbd_dev);
6912	if (ret)
6913		goto err_out_format;
6914
6915	if (need_watch) {
6916		ret = rbd_register_watch(rbd_dev);
6917		if (ret) {
6918			if (ret == -ENOENT)
6919				rbd_print_dne(rbd_dev, false);
6920			goto err_out_format;
6921		}
6922	}
6923
6924	if (!depth)
6925		down_write(&rbd_dev->header_rwsem);
6926
6927	ret = rbd_dev_header_info(rbd_dev, &rbd_dev->header, true);
6928	if (ret) {
6929		if (ret == -ENOENT && !need_watch)
6930			rbd_print_dne(rbd_dev, false);
6931		goto err_out_probe;
6932	}
6933
6934	rbd_init_layout(rbd_dev);
6935
6936	/*
6937	 * If this image is the one being mapped, we have pool name and
6938	 * id, image name and id, and snap name - need to fill snap id.
6939	 * Otherwise this is a parent image, identified by pool, image
6940	 * and snap ids - need to fill in names for those ids.
6941	 */
6942	if (!depth)
6943		ret = rbd_spec_fill_snap_id(rbd_dev);
6944	else
6945		ret = rbd_spec_fill_names(rbd_dev);
6946	if (ret) {
6947		if (ret == -ENOENT)
6948			rbd_print_dne(rbd_dev, true);
6949		goto err_out_probe;
6950	}
6951
6952	ret = rbd_dev_mapping_set(rbd_dev);
6953	if (ret)
6954		goto err_out_probe;
6955
6956	if (rbd_is_snap(rbd_dev) &&
6957	    (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6958		ret = rbd_object_map_load(rbd_dev);
6959		if (ret)
6960			goto err_out_probe;
6961	}
6962
6963	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6964		ret = rbd_dev_setup_parent(rbd_dev);
6965		if (ret)
6966			goto err_out_probe;
6967	}
6968
6969	ret = rbd_dev_probe_parent(rbd_dev, depth);
6970	if (ret)
6971		goto err_out_probe;
6972
6973	dout("discovered format %u image, header name is %s\n",
6974		rbd_dev->image_format, rbd_dev->header_oid.name);
6975	return 0;
6976
6977err_out_probe:
6978	if (!depth)
6979		up_write(&rbd_dev->header_rwsem);
6980	if (need_watch)
6981		rbd_unregister_watch(rbd_dev);
6982	rbd_dev_unprobe(rbd_dev);
6983err_out_format:
6984	rbd_dev->image_format = 0;
6985	kfree(rbd_dev->spec->image_id);
6986	rbd_dev->spec->image_id = NULL;
6987	return ret;
6988}
6989
6990static void rbd_dev_update_header(struct rbd_device *rbd_dev,
6991				  struct rbd_image_header *header)
6992{
6993	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6994	rbd_assert(rbd_dev->header.object_prefix); /* !first_time */
6995
6996	if (rbd_dev->header.image_size != header->image_size) {
6997		rbd_dev->header.image_size = header->image_size;
6998
6999		if (!rbd_is_snap(rbd_dev)) {
7000			rbd_dev->mapping.size = header->image_size;
7001			rbd_dev_update_size(rbd_dev);
7002		}
7003	}
7004
7005	ceph_put_snap_context(rbd_dev->header.snapc);
7006	rbd_dev->header.snapc = header->snapc;
7007	header->snapc = NULL;
7008
7009	if (rbd_dev->image_format == 1) {
7010		kfree(rbd_dev->header.snap_names);
7011		rbd_dev->header.snap_names = header->snap_names;
7012		header->snap_names = NULL;
7013
7014		kfree(rbd_dev->header.snap_sizes);
7015		rbd_dev->header.snap_sizes = header->snap_sizes;
7016		header->snap_sizes = NULL;
7017	}
7018}
7019
7020static void rbd_dev_update_parent(struct rbd_device *rbd_dev,
7021				  struct parent_image_info *pii)
7022{
7023	if (pii->pool_id == CEPH_NOPOOL || !pii->has_overlap) {
7024		/*
7025		 * Either the parent never existed, or we have
7026		 * record of it but the image got flattened so it no
7027		 * longer has a parent.  When the parent of a
7028		 * layered image disappears we immediately set the
7029		 * overlap to 0.  The effect of this is that all new
7030		 * requests will be treated as if the image had no
7031		 * parent.
7032		 *
7033		 * If !pii.has_overlap, the parent image spec is not
7034		 * applicable.  It's there to avoid duplication in each
7035		 * snapshot record.
7036		 */
7037		if (rbd_dev->parent_overlap) {
7038			rbd_dev->parent_overlap = 0;
7039			rbd_dev_parent_put(rbd_dev);
7040			pr_info("%s: clone has been flattened\n",
7041				rbd_dev->disk->disk_name);
7042		}
7043	} else {
7044		rbd_assert(rbd_dev->parent_spec);
7045
7046		/*
7047		 * Update the parent overlap.  If it became zero, issue
7048		 * a warning as we will proceed as if there is no parent.
7049		 */
7050		if (!pii->overlap && rbd_dev->parent_overlap)
7051			rbd_warn(rbd_dev,
7052				 "clone has become standalone (overlap 0)");
7053		rbd_dev->parent_overlap = pii->overlap;
7054	}
7055}
7056
7057static int rbd_dev_refresh(struct rbd_device *rbd_dev)
7058{
7059	struct rbd_image_header	header = { 0 };
7060	struct parent_image_info pii = { 0 };
7061	int ret;
7062
7063	dout("%s rbd_dev %p\n", __func__, rbd_dev);
7064
7065	ret = rbd_dev_header_info(rbd_dev, &header, false);
7066	if (ret)
7067		goto out;
7068
7069	/*
7070	 * If there is a parent, see if it has disappeared due to the
7071	 * mapped image getting flattened.
7072	 */
7073	if (rbd_dev->parent) {
7074		ret = rbd_dev_v2_parent_info(rbd_dev, &pii);
7075		if (ret)
7076			goto out;
7077	}
7078
7079	down_write(&rbd_dev->header_rwsem);
7080	rbd_dev_update_header(rbd_dev, &header);
7081	if (rbd_dev->parent)
7082		rbd_dev_update_parent(rbd_dev, &pii);
7083	up_write(&rbd_dev->header_rwsem);
7084
7085out:
7086	rbd_parent_info_cleanup(&pii);
7087	rbd_image_header_cleanup(&header);
7088	return ret;
7089}
7090
7091static ssize_t do_rbd_add(const char *buf, size_t count)
7092{
7093	struct rbd_device *rbd_dev = NULL;
7094	struct ceph_options *ceph_opts = NULL;
7095	struct rbd_options *rbd_opts = NULL;
7096	struct rbd_spec *spec = NULL;
7097	struct rbd_client *rbdc;
7098	int rc;
7099
7100	if (!capable(CAP_SYS_ADMIN))
7101		return -EPERM;
7102
7103	if (!try_module_get(THIS_MODULE))
7104		return -ENODEV;
7105
7106	/* parse add command */
7107	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7108	if (rc < 0)
7109		goto out;
7110
7111	rbdc = rbd_get_client(ceph_opts);
7112	if (IS_ERR(rbdc)) {
7113		rc = PTR_ERR(rbdc);
7114		goto err_out_args;
7115	}
7116
7117	/* pick the pool */
7118	rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7119	if (rc < 0) {
7120		if (rc == -ENOENT)
7121			pr_info("pool %s does not exist\n", spec->pool_name);
7122		goto err_out_client;
7123	}
7124	spec->pool_id = (u64)rc;
7125
7126	rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7127	if (!rbd_dev) {
7128		rc = -ENOMEM;
7129		goto err_out_client;
7130	}
7131	rbdc = NULL;		/* rbd_dev now owns this */
7132	spec = NULL;		/* rbd_dev now owns this */
7133	rbd_opts = NULL;	/* rbd_dev now owns this */
7134
7135	/* if we are mapping a snapshot it will be a read-only mapping */
7136	if (rbd_dev->opts->read_only ||
7137	    strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7138		__set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7139
7140	rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7141	if (!rbd_dev->config_info) {
7142		rc = -ENOMEM;
7143		goto err_out_rbd_dev;
7144	}
7145
7146	rc = rbd_dev_image_probe(rbd_dev, 0);
7147	if (rc < 0)
7148		goto err_out_rbd_dev;
7149
7150	if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7151		rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7152			 rbd_dev->layout.object_size);
7153		rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7154	}
7155
7156	rc = rbd_dev_device_setup(rbd_dev);
7157	if (rc)
7158		goto err_out_image_probe;
7159
7160	rc = rbd_add_acquire_lock(rbd_dev);
7161	if (rc)
7162		goto err_out_image_lock;
7163
7164	/* Everything's ready.  Announce the disk to the world. */
7165
7166	rc = device_add(&rbd_dev->dev);
7167	if (rc)
7168		goto err_out_image_lock;
7169
7170	rc = device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7171	if (rc)
7172		goto err_out_cleanup_disk;
7173
7174	spin_lock(&rbd_dev_list_lock);
7175	list_add_tail(&rbd_dev->node, &rbd_dev_list);
7176	spin_unlock(&rbd_dev_list_lock);
7177
7178	pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7179		(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7180		rbd_dev->header.features);
7181	rc = count;
7182out:
7183	module_put(THIS_MODULE);
7184	return rc;
7185
7186err_out_cleanup_disk:
7187	rbd_free_disk(rbd_dev);
7188err_out_image_lock:
7189	rbd_dev_image_unlock(rbd_dev);
7190	rbd_dev_device_release(rbd_dev);
7191err_out_image_probe:
7192	rbd_dev_image_release(rbd_dev);
7193err_out_rbd_dev:
7194	rbd_dev_destroy(rbd_dev);
7195err_out_client:
7196	rbd_put_client(rbdc);
7197err_out_args:
7198	rbd_spec_put(spec);
7199	kfree(rbd_opts);
7200	goto out;
7201}
7202
7203static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count)
7204{
7205	if (single_major)
7206		return -EINVAL;
7207
7208	return do_rbd_add(buf, count);
7209}
7210
7211static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf,
7212				      size_t count)
7213{
7214	return do_rbd_add(buf, count);
7215}
7216
7217static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7218{
7219	while (rbd_dev->parent) {
7220		struct rbd_device *first = rbd_dev;
7221		struct rbd_device *second = first->parent;
7222		struct rbd_device *third;
7223
7224		/*
7225		 * Follow to the parent with no grandparent and
7226		 * remove it.
7227		 */
7228		while (second && (third = second->parent)) {
7229			first = second;
7230			second = third;
7231		}
7232		rbd_assert(second);
7233		rbd_dev_image_release(second);
7234		rbd_dev_destroy(second);
7235		first->parent = NULL;
7236		first->parent_overlap = 0;
7237
7238		rbd_assert(first->parent_spec);
7239		rbd_spec_put(first->parent_spec);
7240		first->parent_spec = NULL;
7241	}
7242}
7243
7244static ssize_t do_rbd_remove(const char *buf, size_t count)
7245{
7246	struct rbd_device *rbd_dev = NULL;
7247	int dev_id;
7248	char opt_buf[6];
7249	bool force = false;
7250	int ret;
7251
7252	if (!capable(CAP_SYS_ADMIN))
7253		return -EPERM;
7254
7255	dev_id = -1;
7256	opt_buf[0] = '\0';
7257	sscanf(buf, "%d %5s", &dev_id, opt_buf);
7258	if (dev_id < 0) {
7259		pr_err("dev_id out of range\n");
7260		return -EINVAL;
7261	}
7262	if (opt_buf[0] != '\0') {
7263		if (!strcmp(opt_buf, "force")) {
7264			force = true;
7265		} else {
7266			pr_err("bad remove option at '%s'\n", opt_buf);
7267			return -EINVAL;
7268		}
7269	}
7270
7271	ret = -ENOENT;
7272	spin_lock(&rbd_dev_list_lock);
7273	list_for_each_entry(rbd_dev, &rbd_dev_list, node) {
7274		if (rbd_dev->dev_id == dev_id) {
7275			ret = 0;
7276			break;
7277		}
7278	}
7279	if (!ret) {
7280		spin_lock_irq(&rbd_dev->lock);
7281		if (rbd_dev->open_count && !force)
7282			ret = -EBUSY;
7283		else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7284					  &rbd_dev->flags))
7285			ret = -EINPROGRESS;
7286		spin_unlock_irq(&rbd_dev->lock);
7287	}
7288	spin_unlock(&rbd_dev_list_lock);
7289	if (ret)
7290		return ret;
7291
7292	if (force) {
7293		/*
7294		 * Prevent new IO from being queued and wait for existing
7295		 * IO to complete/fail.
7296		 */
7297		blk_mq_freeze_queue(rbd_dev->disk->queue);
7298		blk_mark_disk_dead(rbd_dev->disk);
7299	}
7300
7301	del_gendisk(rbd_dev->disk);
7302	spin_lock(&rbd_dev_list_lock);
7303	list_del_init(&rbd_dev->node);
7304	spin_unlock(&rbd_dev_list_lock);
7305	device_del(&rbd_dev->dev);
7306
7307	rbd_dev_image_unlock(rbd_dev);
7308	rbd_dev_device_release(rbd_dev);
7309	rbd_dev_image_release(rbd_dev);
7310	rbd_dev_destroy(rbd_dev);
7311	return count;
7312}
7313
7314static ssize_t remove_store(const struct bus_type *bus, const char *buf, size_t count)
7315{
7316	if (single_major)
7317		return -EINVAL;
7318
7319	return do_rbd_remove(buf, count);
7320}
7321
7322static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf,
7323					 size_t count)
7324{
7325	return do_rbd_remove(buf, count);
7326}
7327
7328/*
7329 * create control files in sysfs
7330 * /sys/bus/rbd/...
7331 */
7332static int __init rbd_sysfs_init(void)
7333{
7334	int ret;
7335
7336	ret = device_register(&rbd_root_dev);
7337	if (ret < 0) {
7338		put_device(&rbd_root_dev);
7339		return ret;
7340	}
7341
7342	ret = bus_register(&rbd_bus_type);
7343	if (ret < 0)
7344		device_unregister(&rbd_root_dev);
7345
7346	return ret;
7347}
7348
7349static void __exit rbd_sysfs_cleanup(void)
7350{
7351	bus_unregister(&rbd_bus_type);
7352	device_unregister(&rbd_root_dev);
7353}
7354
7355static int __init rbd_slab_init(void)
7356{
7357	rbd_assert(!rbd_img_request_cache);
7358	rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7359	if (!rbd_img_request_cache)
7360		return -ENOMEM;
7361
7362	rbd_assert(!rbd_obj_request_cache);
7363	rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7364	if (!rbd_obj_request_cache)
7365		goto out_err;
7366
7367	return 0;
7368
7369out_err:
7370	kmem_cache_destroy(rbd_img_request_cache);
7371	rbd_img_request_cache = NULL;
7372	return -ENOMEM;
7373}
7374
7375static void rbd_slab_exit(void)
7376{
7377	rbd_assert(rbd_obj_request_cache);
7378	kmem_cache_destroy(rbd_obj_request_cache);
7379	rbd_obj_request_cache = NULL;
7380
7381	rbd_assert(rbd_img_request_cache);
7382	kmem_cache_destroy(rbd_img_request_cache);
7383	rbd_img_request_cache = NULL;
7384}
7385
7386static int __init rbd_init(void)
7387{
7388	int rc;
7389
7390	if (!libceph_compatible(NULL)) {
7391		rbd_warn(NULL, "libceph incompatibility (quitting)");
7392		return -EINVAL;
7393	}
7394
7395	rc = rbd_slab_init();
7396	if (rc)
7397		return rc;
7398
7399	/*
7400	 * The number of active work items is limited by the number of
7401	 * rbd devices * queue depth, so leave @max_active at default.
7402	 */
7403	rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7404	if (!rbd_wq) {
7405		rc = -ENOMEM;
7406		goto err_out_slab;
7407	}
7408
7409	if (single_major) {
7410		rbd_major = register_blkdev(0, RBD_DRV_NAME);
7411		if (rbd_major < 0) {
7412			rc = rbd_major;
7413			goto err_out_wq;
7414		}
7415	}
7416
7417	rc = rbd_sysfs_init();
7418	if (rc)
7419		goto err_out_blkdev;
7420
7421	if (single_major)
7422		pr_info("loaded (major %d)\n", rbd_major);
7423	else
7424		pr_info("loaded\n");
7425
7426	return 0;
7427
7428err_out_blkdev:
7429	if (single_major)
7430		unregister_blkdev(rbd_major, RBD_DRV_NAME);
7431err_out_wq:
7432	destroy_workqueue(rbd_wq);
7433err_out_slab:
7434	rbd_slab_exit();
7435	return rc;
7436}
7437
7438static void __exit rbd_exit(void)
7439{
7440	ida_destroy(&rbd_dev_id_ida);
7441	rbd_sysfs_cleanup();
7442	if (single_major)
7443		unregister_blkdev(rbd_major, RBD_DRV_NAME);
7444	destroy_workqueue(rbd_wq);
7445	rbd_slab_exit();
7446}
7447
7448module_init(rbd_init);
7449module_exit(rbd_exit);
7450
7451MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7452MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7453MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7454/* following authorship retained from original osdblk.c */
7455MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7456
7457MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7458MODULE_LICENSE("GPL");
7459