From 033268a5f01270f0ef20d1a9a078b157f4af97f8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 12 Aug 2016 14:59:58 +0200 Subject: libceph: rename ceph_client_id() -> ceph_client_gid() It's gid / global_id in other places. Signed-off-by: Ilya Dryomov Reviewed-by: Mike Christie Reviewed-by: Alex Elder --- drivers/block/rbd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6c6519f6492a..e0585e9040f1 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3758,7 +3758,7 @@ static ssize_t rbd_client_id_show(struct device *dev, struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); return sprintf(buf, "client%lld\n", - ceph_client_id(rbd_dev->rbd_client->client)); + ceph_client_gid(rbd_dev->rbd_client->client)); } static ssize_t rbd_pool_show(struct device *dev, -- cgit v1.2.3 From 1643dfa4c2c827d6e2aa419df8c17b0f24090278 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 12 Aug 2016 15:45:52 +0200 Subject: rbd: introduce a per-device ordered workqueue This is going to be used for re-registering watch requests and exclusive-lock tasks: acquire/request lock, notify-acquired, release lock, notify-released. Some refactoring in the map/unmap paths was necessary to give this workqueue a meaningful name: "rbdX-tasks". Signed-off-by: Ilya Dryomov Reviewed-by: Mike Christie --- drivers/block/rbd.c | 151 ++++++++++++++++++++++++---------------------------- 1 file changed, 71 insertions(+), 80 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e0585e9040f1..1c805eea6767 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -128,11 +128,8 @@ static int atomic_dec_return_safe(atomic_t *v) /* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier. - * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big - * enough to hold all possible device names. */ #define DEV_NAME_LEN 32 -#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) /* * block device image metadata (in-memory version) @@ -353,10 +350,12 @@ struct rbd_device { struct ceph_object_id header_oid; struct ceph_object_locator header_oloc; - struct ceph_file_layout layout; + struct ceph_file_layout layout; /* used for all rbd requests */ struct ceph_osd_linger_request *watch_handle; + struct workqueue_struct *task_wq; + struct rbd_spec *parent_spec; u64 parent_overlap; atomic_t parent_ref; @@ -3944,11 +3943,8 @@ static void rbd_spec_free(struct kref *kref) kfree(spec); } -static void rbd_dev_release(struct device *dev) +static void rbd_dev_free(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - bool need_put = !!rbd_dev->opts; - ceph_oid_destroy(&rbd_dev->header_oid); ceph_oloc_destroy(&rbd_dev->header_oloc); @@ -3956,6 +3952,19 @@ static void rbd_dev_release(struct device *dev) rbd_spec_put(rbd_dev->spec); kfree(rbd_dev->opts); kfree(rbd_dev); +} + +static void rbd_dev_release(struct device *dev) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + bool need_put = !!rbd_dev->opts; + + if (need_put) { + destroy_workqueue(rbd_dev->task_wq); + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + } + + rbd_dev_free(rbd_dev); /* * This is racy, but way better than putting module outside of @@ -3966,19 +3975,16 @@ static void rbd_dev_release(struct device *dev) module_put(THIS_MODULE); } -static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, - struct rbd_spec *spec, - struct rbd_options *opts) +static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, + struct rbd_spec *spec) { struct rbd_device *rbd_dev; - rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); + rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); if (!rbd_dev) return NULL; spin_lock_init(&rbd_dev->lock); - rbd_dev->flags = 0; - atomic_set(&rbd_dev->parent_ref, 0); INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); @@ -3992,9 +3998,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->rbd_client = rbdc; rbd_dev->spec = spec; - rbd_dev->opts = opts; - - /* Initialize the layout used for all rbd requests */ rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; rbd_dev->layout.stripe_count = 1; @@ -4002,15 +4005,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->layout.pool_id = spec->pool_id; RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); - /* - * If this is a mapping rbd_dev (as opposed to a parent one), - * pin our module. We have a ref from do_rbd_add(), so use - * __module_get(). - */ - if (rbd_dev->opts) - __module_get(THIS_MODULE); + return rbd_dev; +} + +/* + * Create a mapping rbd_dev. + */ +static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, + struct rbd_spec *spec, + struct rbd_options *opts) +{ + struct rbd_device *rbd_dev; + + rbd_dev = __rbd_dev_create(rbdc, spec); + if (!rbd_dev) + return NULL; + + rbd_dev->opts = opts; + + /* get an id and fill in device name */ + rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, + minor_to_rbd_dev_id(1 << MINORBITS), + GFP_KERNEL); + if (rbd_dev->dev_id < 0) + goto fail_rbd_dev; + + sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); + rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, + rbd_dev->name); + if (!rbd_dev->task_wq) + goto fail_dev_id; + /* we have a ref from do_rbd_add() */ + __module_get(THIS_MODULE); + + dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); return rbd_dev; + +fail_dev_id: + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); +fail_rbd_dev: + rbd_dev_free(rbd_dev); + return NULL; } static void rbd_dev_destroy(struct rbd_device *rbd_dev) @@ -4645,46 +4681,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev) return rbd_dev_v2_header_info(rbd_dev); } -/* - * Get a unique rbd identifier for the given new rbd_dev, and add - * the rbd_dev to the global list. - */ -static int rbd_dev_id_get(struct rbd_device *rbd_dev) -{ - int new_dev_id; - - new_dev_id = ida_simple_get(&rbd_dev_id_ida, - 0, minor_to_rbd_dev_id(1 << MINORBITS), - GFP_KERNEL); - if (new_dev_id < 0) - return new_dev_id; - - rbd_dev->dev_id = new_dev_id; - - spin_lock(&rbd_dev_list_lock); - list_add_tail(&rbd_dev->node, &rbd_dev_list); - spin_unlock(&rbd_dev_list_lock); - - dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id); - - return 0; -} - -/* - * Remove an rbd_dev from the global list, and record that its - * identifier is no longer in use. - */ -static void rbd_dev_id_put(struct rbd_device *rbd_dev) -{ - spin_lock(&rbd_dev_list_lock); - list_del_init(&rbd_dev->node); - spin_unlock(&rbd_dev_list_lock); - - ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); - - dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); -} - /* * Skips over white space at *buf, and updates *buf to point to the * first found non-space character (if any). Returns the length of @@ -5077,8 +5073,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) goto out_err; } - parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec, - NULL); + parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); if (!parent) { ret = -ENOMEM; goto out_err; @@ -5113,22 +5108,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) { int ret; - /* Get an id and fill in device name. */ - - ret = rbd_dev_id_get(rbd_dev); - if (ret) - goto err_out_unlock; - - BUILD_BUG_ON(DEV_NAME_LEN - < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); - sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); - /* Record our major and minor device numbers. */ if (!single_major) { ret = register_blkdev(0, rbd_dev->name); if (ret < 0) - goto err_out_id; + goto err_out_unlock; rbd_dev->major = ret; rbd_dev->minor = 0; @@ -5160,6 +5145,10 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); + spin_lock(&rbd_dev_list_lock); + list_add_tail(&rbd_dev->node, &rbd_dev_list); + spin_unlock(&rbd_dev_list_lock); + add_disk(rbd_dev->disk); pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, (unsigned long long) rbd_dev->mapping.size); @@ -5173,8 +5162,6 @@ err_out_disk: err_out_blkdev: if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); -err_out_id: - rbd_dev_id_put(rbd_dev); err_out_unlock: up_write(&rbd_dev->header_rwsem); return ret; @@ -5406,12 +5393,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, static void rbd_dev_device_release(struct rbd_device *rbd_dev) { rbd_free_disk(rbd_dev); + + spin_lock(&rbd_dev_list_lock); + list_del_init(&rbd_dev->node); + spin_unlock(&rbd_dev_list_lock); + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); device_del(&rbd_dev->dev); rbd_dev_mapping_clear(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); - rbd_dev_id_put(rbd_dev); } static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) -- cgit v1.2.3 From 99d1694310df3ffef66902f5bc1a23e95a724aa3 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 12 Aug 2016 16:11:41 +0200 Subject: rbd: retry watch re-registration periodically Revamp watch code to support retrying watch re-registration: - add rbd_dev->watch_state for more robust errcb handling - store watch cookie separately to avoid dereferencing watch_handle which is set to NULL on unwatch - move re-register code into a delayed work and retry re-registration every second, unless the client is blacklisted Signed-off-by: Ilya Dryomov Reviewed-by: Mike Christie Tested-by: Mike Christie --- drivers/block/rbd.c | 138 +++++++++++++++++++++++++++++++++++++++----------- net/ceph/osd_client.c | 1 + 2 files changed, 110 insertions(+), 29 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 1c805eea6767..cb96fb19e8a7 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -114,6 +114,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_OBJ_PREFIX_LEN_MAX 64 +#define RBD_RETRY_DELAY msecs_to_jiffies(1000) + /* Feature bits */ #define RBD_FEATURE_LAYERING (1<<0) @@ -319,6 +321,12 @@ struct rbd_img_request { #define for_each_obj_request_safe(ireq, oreq, n) \ list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) +enum rbd_watch_state { + RBD_WATCH_STATE_UNREGISTERED, + RBD_WATCH_STATE_REGISTERED, + RBD_WATCH_STATE_ERROR, +}; + struct rbd_mapping { u64 size; u64 features; @@ -352,7 +360,11 @@ struct rbd_device { struct ceph_file_layout layout; /* used for all rbd requests */ + struct mutex watch_mutex; + enum rbd_watch_state watch_state; struct ceph_osd_linger_request *watch_handle; + u64 watch_cookie; + struct delayed_work watch_dwork; struct workqueue_struct *task_wq; @@ -3083,9 +3095,6 @@ out_err: obj_request_done_set(obj_request); } -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev); -static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev); - static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, u64 notifier_id, void *data, size_t data_len) { @@ -3113,35 +3122,34 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, rbd_warn(rbd_dev, "notify_ack ret %d", ret); } +static void __rbd_unregister_watch(struct rbd_device *rbd_dev); + static void rbd_watch_errcb(void *arg, u64 cookie, int err) { struct rbd_device *rbd_dev = arg; - int ret; rbd_warn(rbd_dev, "encountered watch error: %d", err); - __rbd_dev_header_unwatch_sync(rbd_dev); + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { + __rbd_unregister_watch(rbd_dev); + rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; - ret = rbd_dev_header_watch_sync(rbd_dev); - if (ret) { - rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - return; + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); } - - ret = rbd_dev_refresh(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + mutex_unlock(&rbd_dev->watch_mutex); } /* - * Initiate a watch request, synchronously. + * watch_mutex must be locked */ -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) +static int __rbd_register_watch(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_linger_request *handle; rbd_assert(!rbd_dev->watch_handle); + dout("%s rbd_dev %p\n", __func__, rbd_dev); handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, rbd_watch_cb, @@ -3153,13 +3161,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) return 0; } -static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +/* + * watch_mutex must be locked + */ +static void __rbd_unregister_watch(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; - if (!rbd_dev->watch_handle) - return; + rbd_assert(rbd_dev->watch_handle); + dout("%s rbd_dev %p\n", __func__, rbd_dev); ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); if (ret) @@ -3168,17 +3179,80 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) rbd_dev->watch_handle = NULL; } -/* - * Tear down a watch request, synchronously. - */ -static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +static int rbd_register_watch(struct rbd_device *rbd_dev) +{ + int ret; + + mutex_lock(&rbd_dev->watch_mutex); + rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); + ret = __rbd_register_watch(rbd_dev); + if (ret) + goto out; + + rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; + rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; + +out: + mutex_unlock(&rbd_dev->watch_mutex); + return ret; +} + +static void cancel_tasks_sync(struct rbd_device *rbd_dev) { - __rbd_dev_header_unwatch_sync(rbd_dev); + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + cancel_delayed_work_sync(&rbd_dev->watch_dwork); +} + +static void rbd_unregister_watch(struct rbd_device *rbd_dev) +{ + cancel_tasks_sync(rbd_dev); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) + __rbd_unregister_watch(rbd_dev); + rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; + mutex_unlock(&rbd_dev->watch_mutex); - dout("%s flushing notifies\n", __func__); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); } +static void rbd_reregister_watch(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), + struct rbd_device, watch_dwork); + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) + goto fail_unlock; + + ret = __rbd_register_watch(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); + if (ret != -EBLACKLISTED) + queue_delayed_work(rbd_dev->task_wq, + &rbd_dev->watch_dwork, + RBD_RETRY_DELAY); + goto fail_unlock; + } + + rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; + rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; + mutex_unlock(&rbd_dev->watch_mutex); + + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + + return; + +fail_unlock: + mutex_unlock(&rbd_dev->watch_mutex); +} + /* * Synchronous osd object method call. Returns the number of bytes * returned in the outbound buffer, or a negative error code. @@ -3945,6 +4019,8 @@ static void rbd_spec_free(struct kref *kref) static void rbd_dev_free(struct rbd_device *rbd_dev) { + WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); + ceph_oid_destroy(&rbd_dev->header_oid); ceph_oloc_destroy(&rbd_dev->header_oloc); @@ -3991,6 +4067,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, ceph_oid_init(&rbd_dev->header_oid); ceph_oloc_init(&rbd_dev->header_oloc); + mutex_init(&rbd_dev->watch_mutex); + rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; + INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); + rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; @@ -5222,7 +5302,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) goto err_out_format; if (!depth) { - ret = rbd_dev_header_watch_sync(rbd_dev); + ret = rbd_register_watch(rbd_dev); if (ret) { if (ret == -ENOENT) pr_info("image %s/%s does not exist\n", @@ -5281,7 +5361,7 @@ err_out_probe: rbd_dev_unprobe(rbd_dev); err_out_watch: if (!depth) - rbd_dev_header_unwatch_sync(rbd_dev); + rbd_unregister_watch(rbd_dev); err_out_format: rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); @@ -5348,11 +5428,11 @@ static ssize_t do_rbd_add(struct bus_type *bus, rc = rbd_dev_device_setup(rbd_dev); if (rc) { /* - * rbd_dev_header_unwatch_sync() can't be moved into + * rbd_unregister_watch() can't be moved into * rbd_dev_image_release() without refactoring, see * commit 1f3ef78861ac. */ - rbd_dev_header_unwatch_sync(rbd_dev); + rbd_unregister_watch(rbd_dev); rbd_dev_image_release(rbd_dev); goto out; } @@ -5473,7 +5553,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; - rbd_dev_header_unwatch_sync(rbd_dev); + rbd_unregister_watch(rbd_dev); /* * Don't free anything from rbd_dev->disk until after all diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index fbc6b7090c65..d9bf7a1d0a58 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -4014,6 +4014,7 @@ EXPORT_SYMBOL(ceph_osdc_list_watchers); */ void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) { + dout("%s osdc %p\n", __func__, osdc); flush_workqueue(osdc->notify_wq); } EXPORT_SYMBOL(ceph_osdc_flush_notifies); -- cgit v1.2.3 From ed95b21a4b0a71ef89306cdeb427d53cc9cb343f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 12 Aug 2016 16:40:02 +0200 Subject: rbd: support for exclusive-lock feature Add basic support for RBD_FEATURE_EXCLUSIVE_LOCK feature. Maintenance operations (resize, snapshot create, etc) are offloaded to librbd via returning -EOPNOTSUPP - librbd should request the lock and execute the operation. Signed-off-by: Ilya Dryomov Reviewed-by: Mike Christie Tested-by: Mike Christie --- drivers/block/rbd.c | 812 +++++++++++++++++++++++++++++++++++++++++++++- drivers/block/rbd_types.h | 11 + net/ceph/ceph_strings.c | 1 + 3 files changed, 808 insertions(+), 16 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cb96fb19e8a7..7cda1cc60c2c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -114,14 +115,17 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_OBJ_PREFIX_LEN_MAX 64 +#define RBD_NOTIFY_TIMEOUT 5 /* seconds */ #define RBD_RETRY_DELAY msecs_to_jiffies(1000) /* Feature bits */ #define RBD_FEATURE_LAYERING (1<<0) #define RBD_FEATURE_STRIPINGV2 (1<<1) -#define RBD_FEATURES_ALL \ - (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) +#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) +#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ + RBD_FEATURE_STRIPINGV2 | \ + RBD_FEATURE_EXCLUSIVE_LOCK) /* Features supported by this (client software) implementation. */ @@ -327,6 +331,18 @@ enum rbd_watch_state { RBD_WATCH_STATE_ERROR, }; +enum rbd_lock_state { + RBD_LOCK_STATE_UNLOCKED, + RBD_LOCK_STATE_LOCKED, + RBD_LOCK_STATE_RELEASING, +}; + +/* WatchNotify::ClientId */ +struct rbd_client_id { + u64 gid; + u64 handle; +}; + struct rbd_mapping { u64 size; u64 features; @@ -366,6 +382,15 @@ struct rbd_device { u64 watch_cookie; struct delayed_work watch_dwork; + struct rw_semaphore lock_rwsem; + enum rbd_lock_state lock_state; + struct rbd_client_id owner_cid; + struct work_struct acquired_lock_work; + struct work_struct released_lock_work; + struct delayed_work lock_dwork; + struct work_struct unlock_work; + wait_queue_head_t lock_waitq; + struct workqueue_struct *task_wq; struct rbd_spec *parent_spec; @@ -450,6 +475,29 @@ static int minor_to_rbd_dev_id(int minor) return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } +static bool rbd_is_lock_supported(struct rbd_device *rbd_dev) +{ + return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && + rbd_dev->spec->snap_id == CEPH_NOSNAP && + !rbd_dev->mapping.read_only; +} + +static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || + rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; +} + +static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + bool is_lock_owner; + + down_read(&rbd_dev->lock_rwsem); + is_lock_owner = __rbd_is_lock_owner(rbd_dev); + up_read(&rbd_dev->lock_rwsem); + return is_lock_owner; +} + static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); @@ -3095,31 +3143,690 @@ out_err: obj_request_done_set(obj_request); } -static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, - u64 notifier_id, void *data, size_t data_len) +static const struct rbd_client_id rbd_empty_cid; + +static bool rbd_cid_equal(const struct rbd_client_id *lhs, + const struct rbd_client_id *rhs) +{ + return lhs->gid == rhs->gid && lhs->handle == rhs->handle; +} + +static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) +{ + struct rbd_client_id cid; + + mutex_lock(&rbd_dev->watch_mutex); + cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); + cid.handle = rbd_dev->watch_cookie; + mutex_unlock(&rbd_dev->watch_mutex); + return cid; +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_set_owner_cid(struct rbd_device *rbd_dev, + const struct rbd_client_id *cid) +{ + dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, + cid->gid, cid->handle); + rbd_dev->owner_cid = *cid; /* struct */ +} + +static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) +{ + mutex_lock(&rbd_dev->watch_mutex); + sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); + mutex_unlock(&rbd_dev->watch_mutex); +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + char cookie[32]; + int ret; + + WARN_ON(__rbd_is_lock_owner(rbd_dev)); + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, + RBD_LOCK_TAG, "", 0); + if (ret) + return ret; + + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; + rbd_set_owner_cid(rbd_dev, &cid); + queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); + return 0; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_unlock(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = arg; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + char cookie[32]; int ret; - dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, - cookie, notify_id); + WARN_ON(!__rbd_is_lock_owner(rbd_dev)); + + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, cookie); + if (ret && ret != -ENOENT) { + rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); + return ret; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); + return 0; +} + +static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op, + struct page ***preply_pages, + size_t *preply_len) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + void *p = buf; + + dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); + + /* encode *LockPayload NotifyMessage (op + ClientId) */ + ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, notify_op); + ceph_encode_64(&p, cid.gid); + ceph_encode_64(&p, cid.handle); + + return ceph_osdc_notify(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, buf, buf_size, + RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); +} + +static void rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op) +{ + struct page **reply_pages; + size_t reply_len; + + __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); +} + +static void rbd_notify_acquired_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + acquired_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); +} + +static void rbd_notify_released_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + released_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); +} + +static int rbd_request_lock(struct rbd_device *rbd_dev) +{ + struct page **reply_pages; + size_t reply_len; + bool lock_owner_responded = false; + int ret; + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, + &reply_pages, &reply_len); + if (ret && ret != -ETIMEDOUT) { + rbd_warn(rbd_dev, "failed to request lock: %d", ret); + goto out; + } + + if (reply_len > 0 && reply_len <= PAGE_SIZE) { + void *p = page_address(reply_pages[0]); + void *const end = p + reply_len; + u32 n; + + ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ + while (n--) { + u8 struct_v; + u32 len; + + ceph_decode_need(&p, end, 8 + 8, e_inval); + p += 8 + 8; /* skip gid and cookie */ + + ceph_decode_32_safe(&p, end, len, e_inval); + if (!len) + continue; + + if (lock_owner_responded) { + rbd_warn(rbd_dev, + "duplicate lock owners detected"); + ret = -EIO; + goto out; + } + + lock_owner_responded = true; + ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, + "failed to decode ResponseMessage: %d", + ret); + goto e_inval; + } + + ret = ceph_decode_32(&p); + } + } + + if (!lock_owner_responded) { + rbd_warn(rbd_dev, "no lock owners detected"); + ret = -ETIMEDOUT; + } + +out: + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +{ + dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + + cancel_delayed_work(&rbd_dev->lock_dwork); + if (wake_all) + wake_up_all(&rbd_dev->lock_waitq); + else + wake_up(&rbd_dev->lock_waitq); +} + +static int get_lock_owner_info(struct rbd_device *rbd_dev, + struct ceph_locker **lockers, u32 *num_lockers) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + u8 lock_type; + char *lock_tag; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + &lock_type, &lock_tag, lockers, num_lockers); + if (ret) + return ret; + + if (*num_lockers == 0) { + dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); + goto out; + } + + if (strcmp(lock_tag, RBD_LOCK_TAG)) { + rbd_warn(rbd_dev, "locked by external mechanism, tag %s", + lock_tag); + ret = -EBUSY; + goto out; + } + + if (lock_type == CEPH_CLS_LOCK_SHARED) { + rbd_warn(rbd_dev, "shared lock type detected"); + ret = -EBUSY; + goto out; + } + + if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, + strlen(RBD_LOCK_COOKIE_PREFIX))) { + rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", + (*lockers)[0].id.cookie); + ret = -EBUSY; + goto out; + } + +out: + kfree(lock_tag); + return ret; +} + +static int find_watcher(struct rbd_device *rbd_dev, + const struct ceph_locker *locker) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_watch_item *watchers; + u32 num_watchers; + u64 cookie; + int i; + int ret; + + ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, &watchers, + &num_watchers); + if (ret) + return ret; + + sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); + for (i = 0; i < num_watchers; i++) { + if (!memcmp(&watchers[i].addr, &locker->info.addr, + sizeof(locker->info.addr)) && + watchers[i].cookie == cookie) { + struct rbd_client_id cid = { + .gid = le64_to_cpu(watchers[i].name.num), + .handle = cookie, + }; + + dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, + rbd_dev, cid.gid, cid.handle); + rbd_set_owner_cid(rbd_dev, &cid); + ret = 1; + goto out; + } + } + + dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); + ret = 0; +out: + kfree(watchers); + return ret; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_try_lock(struct rbd_device *rbd_dev) +{ + struct ceph_client *client = rbd_dev->rbd_client->client; + struct ceph_locker *lockers; + u32 num_lockers; + int ret; + + for (;;) { + ret = rbd_lock(rbd_dev); + if (ret != -EBUSY) + return ret; + + /* determine if the current lock holder is still alive */ + ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); + if (ret) + return ret; + + if (num_lockers == 0) + goto again; + + ret = find_watcher(rbd_dev, lockers); + if (ret) { + if (ret > 0) + ret = 0; /* have to request lock */ + goto out; + } + + rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_monc_blacklist_add(&client->monc, + &lockers[0].info.addr); + if (ret) { + rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", + ENTITY_NAME(lockers[0].id.name), ret); + goto out; + } + + ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + lockers[0].id.cookie, + &lockers[0].id.name); + if (ret && ret != -ENOENT) + goto out; + +again: + ceph_free_lockers(lockers, num_lockers); + } + +out: + ceph_free_lockers(lockers, num_lockers); + return ret; +} + +/* + * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + */ +static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, + int *pret) +{ + enum rbd_lock_state lock_state; + + down_read(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (__rbd_is_lock_owner(rbd_dev)) { + lock_state = rbd_dev->lock_state; + up_read(&rbd_dev->lock_rwsem); + return lock_state; + } + + up_read(&rbd_dev->lock_rwsem); + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (!__rbd_is_lock_owner(rbd_dev)) { + *pret = rbd_try_lock(rbd_dev); + if (*pret) + rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + } + + lock_state = rbd_dev->lock_state; + up_write(&rbd_dev->lock_rwsem); + return lock_state; +} + +static void rbd_acquire_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), + struct rbd_device, lock_dwork); + enum rbd_lock_state lock_state; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); +again: + lock_state = rbd_try_acquire_lock(rbd_dev, &ret); + if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { + if (lock_state == RBD_LOCK_STATE_LOCKED) + wake_requests(rbd_dev, true); + dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, + rbd_dev, lock_state, ret); + return; + } + + ret = rbd_request_lock(rbd_dev); + if (ret == -ETIMEDOUT) { + goto again; /* treat this as a dead client */ + } else if (ret < 0) { + rbd_warn(rbd_dev, "error requesting lock: %d", ret); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + RBD_RETRY_DELAY); + } else { + /* + * lock owner acked, but resend if we don't see them + * release the lock + */ + dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, + rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); + } +} + +/* + * lock_rwsem must be held for write + */ +static bool rbd_release_lock(struct rbd_device *rbd_dev) +{ + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) + return false; + + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; + downgrade_write(&rbd_dev->lock_rwsem); /* - * Until adequate refresh error handling is in place, there is - * not much we can do here, except warn. + * Ensure that all in-flight IO is flushed. * - * See http://tracker.ceph.com/issues/5040 + * FIXME: ceph_osdc_sync() flushes the entire OSD client, which + * may be shared with other devices. */ - ret = rbd_dev_refresh(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "refresh failed: %d", ret); + ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); + up_read(&rbd_dev->lock_rwsem); + + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) + return false; + + if (!rbd_unlock(rbd_dev)) + /* + * Give others a chance to grab the lock - we would re-acquire + * almost immediately if we got new IO during ceph_osdc_sync() + * otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_wait_state_locked() + * after wake_requests() in rbd_handle_released_lock(). + */ + cancel_delayed_work(&rbd_dev->lock_dwork); + + return true; +} + +static void rbd_release_lock_work(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + unlock_work); + + down_write(&rbd_dev->lock_rwsem); + rbd_release_lock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + /* + * we already know that the remote client is + * the owner + */ + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", + __func__, rbd_dev, cid.gid, cid.handle, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); + struct rbd_client_id cid = { 0 }; + bool need_to_send; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (rbd_cid_equal(&cid, &my_cid)) + return false; + + down_read(&rbd_dev->lock_rwsem); + need_to_send = __rbd_is_lock_owner(rbd_dev); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { + if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { + dout("%s rbd_dev %p queueing unlock_work\n", __func__, + rbd_dev); + queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); + } + } + up_read(&rbd_dev->lock_rwsem); + return need_to_send; +} + +static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 *result) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + int ret; + + if (result) { + void *p = buf; + + /* encode ResponseMessage */ + ceph_start_encoding(&p, 1, 1, + buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, *result); + } else { + buf_size = 0; + } ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, notify_id, cookie, - NULL, 0); + buf, buf_size); if (ret) - rbd_warn(rbd_dev, "notify_ack ret %d", ret); + rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); +} + +static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, + u64 cookie) +{ + dout("%s rbd_dev %p\n", __func__, rbd_dev); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); +} + +static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 result) +{ + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); +} + +static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) +{ + struct rbd_device *rbd_dev = arg; + void *p = data; + void *const end = p + data_len; + u8 struct_v; + u32 len; + u32 notify_op; + int ret; + + dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", + __func__, rbd_dev, cookie, notify_id, data_len); + if (data_len) { + ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", + ret); + return; + } + + notify_op = ceph_decode_32(&p); + } else { + /* legacy notification for header updates */ + notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; + len = 0; + } + + dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); + switch (notify_op) { + case RBD_NOTIFY_OP_ACQUIRED_LOCK: + rbd_handle_acquired_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_RELEASED_LOCK: + rbd_handle_released_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_REQUEST_LOCK: + if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) + /* + * send ResponseMessage(0) back so the client + * can detect a missing owner + */ + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, 0); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_HEADER_UPDATE: + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "refresh failed: %d", ret); + + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + default: + if (rbd_is_lock_owner(rbd_dev)) + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, -EOPNOTSUPP); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + } } static void __rbd_unregister_watch(struct rbd_device *rbd_dev); @@ -3130,6 +3837,10 @@ static void rbd_watch_errcb(void *arg, u64 cookie, int err) rbd_warn(rbd_dev, "encountered watch error: %d", err); + down_write(&rbd_dev->lock_rwsem); + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + up_write(&rbd_dev->lock_rwsem); + mutex_lock(&rbd_dev->watch_mutex); if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { __rbd_unregister_watch(rbd_dev); @@ -3202,10 +3913,15 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev) dout("%s rbd_dev %p\n", __func__, rbd_dev); cancel_delayed_work_sync(&rbd_dev->watch_dwork); + cancel_work_sync(&rbd_dev->acquired_lock_work); + cancel_work_sync(&rbd_dev->released_lock_work); + cancel_delayed_work_sync(&rbd_dev->lock_dwork); + cancel_work_sync(&rbd_dev->unlock_work); } static void rbd_unregister_watch(struct rbd_device *rbd_dev) { + WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); cancel_tasks_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex); @@ -3221,10 +3937,15 @@ static void rbd_reregister_watch(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, watch_dwork); + bool was_lock_owner = false; int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); + down_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) + was_lock_owner = rbd_release_lock(rbd_dev); + mutex_lock(&rbd_dev->watch_mutex); if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) goto fail_unlock; @@ -3247,10 +3968,20 @@ static void rbd_reregister_watch(struct work_struct *work) if (ret) rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + if (was_lock_owner) { + ret = rbd_try_lock(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration lock failed: %d", + ret); + } + + up_write(&rbd_dev->lock_rwsem); + wake_requests(rbd_dev, true); return; fail_unlock: mutex_unlock(&rbd_dev->watch_mutex); + up_write(&rbd_dev->lock_rwsem); } /* @@ -3340,6 +4071,29 @@ out: return ret; } +/* + * lock_rwsem must be held for read + */ +static void rbd_wait_state_locked(struct rbd_device *rbd_dev) +{ + DEFINE_WAIT(wait); + + do { + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_requests(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, + TASK_UNINTERRUPTIBLE); + up_read(&rbd_dev->lock_rwsem); + schedule(); + down_read(&rbd_dev->lock_rwsem); + } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + finish_wait(&rbd_dev->lock_waitq, &wait); +} + static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -3350,6 +4104,7 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; + bool must_be_locked = false; int result; if (rq->cmd_type != REQ_TYPE_FS) { @@ -3411,6 +4166,7 @@ static void rbd_queue_workfn(struct work_struct *work) if (op_type != OBJ_OP_READ) { snapc = rbd_dev->header.snapc; ceph_get_snap_context(snapc); + must_be_locked = rbd_is_lock_supported(rbd_dev); } up_read(&rbd_dev->header_rwsem); @@ -3421,11 +4177,17 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } + if (must_be_locked) { + down_read(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) + rbd_wait_state_locked(rbd_dev); + } + img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, snapc); if (!img_request) { result = -ENOMEM; - goto err_rq; + goto err_unlock; } img_request->rq = rq; snapc = NULL; /* img_request consumes a ref */ @@ -3443,10 +4205,15 @@ static void rbd_queue_workfn(struct work_struct *work) if (result) goto err_img_request; + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); return; err_img_request: rbd_img_request_put(img_request); +err_unlock: + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", @@ -4020,6 +4787,7 @@ static void rbd_spec_free(struct kref *kref) static void rbd_dev_free(struct rbd_device *rbd_dev) { WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); + WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); ceph_oid_destroy(&rbd_dev->header_oid); ceph_oloc_destroy(&rbd_dev->header_oloc); @@ -4071,6 +4839,14 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); + init_rwsem(&rbd_dev->lock_rwsem); + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); + INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); + INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); + INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); + init_waitqueue_head(&rbd_dev->lock_waitq); + rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; @@ -5553,6 +6329,10 @@ static ssize_t do_rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; + down_write(&rbd_dev->lock_rwsem); + if (__rbd_is_lock_owner(rbd_dev)) + rbd_unlock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); rbd_unregister_watch(rbd_dev); /* diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h index 49d77cbcf8bd..94f367db27b0 100644 --- a/drivers/block/rbd_types.h +++ b/drivers/block/rbd_types.h @@ -28,6 +28,17 @@ #define RBD_DATA_PREFIX "rbd_data." #define RBD_ID_PREFIX "rbd_id." +#define RBD_LOCK_NAME "rbd_lock" +#define RBD_LOCK_TAG "internal" +#define RBD_LOCK_COOKIE_PREFIX "auto" + +enum rbd_notify_op { + RBD_NOTIFY_OP_ACQUIRED_LOCK = 0, + RBD_NOTIFY_OP_RELEASED_LOCK = 1, + RBD_NOTIFY_OP_REQUEST_LOCK = 2, + RBD_NOTIFY_OP_HEADER_UPDATE = 3, +}; + /* * For format version 1, rbd image 'foo' consists of objects * foo.rbd - image metadata diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 3773a4fa11e3..19b7d8aa915c 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -15,6 +15,7 @@ const char *ceph_entity_type_name(int type) default: return "unknown"; } } +EXPORT_SYMBOL(ceph_entity_type_name); const char *ceph_osd_op_name(int op) { -- cgit v1.2.3 From ca7909e8bbb4ddc549fa1e8afa695f147bb6358c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 18 Aug 2016 18:38:41 +0200 Subject: rbd: print capacity in decimal and features in hex With exclusive-lock added and more to come, print features into dmesg. Change capacity to decimal while at it. Signed-off-by: Ilya Dryomov Reviewed-by: Mike Christie --- drivers/block/rbd.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 7cda1cc60c2c..fd1a9891b348 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -6006,8 +6006,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) spin_unlock(&rbd_dev_list_lock); add_disk(rbd_dev->disk); - pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, - (unsigned long long) rbd_dev->mapping.size); + pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, + (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, + rbd_dev->header.features); return ret; -- cgit v1.2.3 From 005a07bf0a92e7f0e73fc9a6c9acc992c5dbd00c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 18 Aug 2016 18:38:43 +0200 Subject: rbd: add 'client_addr' sysfs rbd device attribute Export client addr/nonce, so userspace can check if a image is being blacklisted. Signed-off-by: Mike Christie [idryomov@gmail.com: ceph_client_addr(), endianess fix] Signed-off-by: Ilya Dryomov --- Documentation/ABI/testing/sysfs-bus-rbd | 6 ++++++ drivers/block/rbd.c | 13 +++++++++++++ include/linux/ceph/libceph.h | 1 + net/ceph/ceph_common.c | 6 ++++++ 4 files changed, 26 insertions(+) (limited to 'drivers/block') diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 2ddd680929d8..273e27f2491f 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -43,6 +43,12 @@ Description: Available only if rbd module is inserted with single_major Entries under /sys/bus/rbd/devices// -------------------------------------------- +client_addr + + The ceph unique client entity_addr_t (address + nonce). + The format is
:/: '1.2.3.4:1234/5678' or + '[1:2:3:4:5:6:7:8]:1234/5678'. (August 2016, since 4.9.) + client_id The ceph unique client id that was assigned for this specific session. diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index fd1a9891b348..69d76c3afcdd 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4592,6 +4592,17 @@ static ssize_t rbd_minor_show(struct device *dev, return sprintf(buf, "%d\n", rbd_dev->minor); } +static ssize_t rbd_client_addr_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + struct ceph_entity_addr *client_addr = + ceph_client_addr(rbd_dev->rbd_client->client); + + return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, + le32_to_cpu(client_addr->nonce)); +} + static ssize_t rbd_client_id_show(struct device *dev, struct device_attribute *attr, char *buf) { @@ -4702,6 +4713,7 @@ static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); +static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); @@ -4716,6 +4728,7 @@ static struct attribute *rbd_attrs[] = { &dev_attr_features.attr, &dev_attr_major.attr, &dev_attr_minor.attr, + &dev_attr_client_addr.attr, &dev_attr_client_id.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index b4cffff70e44..1816c5e26581 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -264,6 +264,7 @@ extern struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, u64 supported_features, u64 required_features); +struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client); u64 ceph_client_gid(struct ceph_client *client); extern void ceph_destroy_client(struct ceph_client *client); extern int __ceph_open_session(struct ceph_client *client, diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 8a7921767308..464e88599b9d 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -566,6 +566,12 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client) } EXPORT_SYMBOL(ceph_print_client_options); +struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client) +{ + return &client->msgr.inst.addr; +} +EXPORT_SYMBOL(ceph_client_addr); + u64 ceph_client_gid(struct ceph_client *client) { return client->monc.auth->global_id; -- cgit v1.2.3 From 267fb90b8344eeb6f835734e356b422f78617088 Mon Sep 17 00:00:00 2001 From: Mike Christie Date: Thu, 18 Aug 2016 18:38:43 +0200 Subject: rbd: add 'cluster_fsid' sysfs rbd device attribute Export the cluster fsid, so tools like udev and multipath-tools can use it for part of the uuid. Signed-off-by: Mike Christie Signed-off-by: Ilya Dryomov --- Documentation/ABI/testing/sysfs-bus-rbd | 4 ++++ drivers/block/rbd.c | 10 ++++++++++ 2 files changed, 14 insertions(+) (limited to 'drivers/block') diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 273e27f2491f..9a655f3f4386 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -53,6 +53,10 @@ client_id The ceph unique client id that was assigned for this specific session. +cluster_fsid + + The ceph cluster UUID. (August 2016, since 4.9.) + features A hexadecimal encoding of the feature bits for this image. diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 69d76c3afcdd..c95104a80065 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4612,6 +4612,14 @@ static ssize_t rbd_client_id_show(struct device *dev, ceph_client_gid(rbd_dev->rbd_client->client)); } +static ssize_t rbd_cluster_fsid_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); +} + static ssize_t rbd_pool_show(struct device *dev, struct device_attribute *attr, char *buf) { @@ -4715,6 +4723,7 @@ static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); +static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); @@ -4730,6 +4739,7 @@ static struct attribute *rbd_attrs[] = { &dev_attr_minor.attr, &dev_attr_client_addr.attr, &dev_attr_client_id.attr, + &dev_attr_cluster_fsid.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, &dev_attr_name.attr, -- cgit v1.2.3 From 92a58671549f365a962517cc7cccb624dea8581e Mon Sep 17 00:00:00 2001 From: Mike Christie Date: Thu, 18 Aug 2016 18:38:44 +0200 Subject: rbd: add 'snap_id' sysfs rbd device attribute Export snap id in sysfs, so tools like multipathd can use it in a uuid. Signed-off-by: Mike Christie Signed-off-by: Ilya Dryomov --- Documentation/ABI/testing/sysfs-bus-rbd | 4 ++++ drivers/block/rbd.c | 10 ++++++++++ 2 files changed, 14 insertions(+) (limited to 'drivers/block') diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 9a655f3f4386..7bf8d4fa6f63 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -102,6 +102,10 @@ current_snap The current snapshot for which the device is mapped. +snap_id + + The current snapshot's id. (August 2016, since 4.9.) + parent Information identifying the chain of parent images in a layered rbd diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c95104a80065..36ebec19dc20 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4669,6 +4669,14 @@ static ssize_t rbd_snap_show(struct device *dev, return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); } +static ssize_t rbd_snap_id_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); +} + /* * For a v2 image, shows the chain of parent images, separated by empty * lines. For v1 images or if there is no parent, shows "(no parent @@ -4730,6 +4738,7 @@ static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); +static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); static struct attribute *rbd_attrs[] = { @@ -4745,6 +4754,7 @@ static struct attribute *rbd_attrs[] = { &dev_attr_name.attr, &dev_attr_image_id.attr, &dev_attr_current_snap.attr, + &dev_attr_snap_id.attr, &dev_attr_parent.attr, &dev_attr_refresh.attr, NULL -- cgit v1.2.3 From 0d6d1e9c2e970c26e8a1ec4932ffffacec90e0b4 Mon Sep 17 00:00:00 2001 From: Mike Christie Date: Thu, 18 Aug 2016 18:38:45 +0200 Subject: rbd: add 'config_info' sysfs rbd device attribute Export the info used to setup the rbd image, so it can be used to remap the image. Signed-off-by: Mike Christie [idryomov@gmail.com: do_rbd_add() EH] Signed-off-by: Ilya Dryomov --- Documentation/ABI/testing/sysfs-bus-rbd | 5 +++++ drivers/block/rbd.c | 23 +++++++++++++++++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) (limited to 'drivers/block') diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 7bf8d4fa6f63..6dccbf82fcf4 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -57,6 +57,11 @@ cluster_fsid The ceph cluster UUID. (August 2016, since 4.9.) +config_info + + The string written into /sys/bus/rbd/add{,_single_major}. (August + 2016, since 4.9.) + features A hexadecimal encoding of the feature bits for this image. diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 36ebec19dc20..8ff2dc872008 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -370,6 +370,7 @@ struct rbd_device { unsigned long flags; /* possibly lock protected */ struct rbd_spec *spec; struct rbd_options *opts; + char *config_info; /* add{,_single_major} string */ struct ceph_object_id header_oid; struct ceph_object_locator header_oloc; @@ -4620,6 +4621,14 @@ static ssize_t rbd_cluster_fsid_show(struct device *dev, return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); } +static ssize_t rbd_config_info_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%s\n", rbd_dev->config_info); +} + static ssize_t rbd_pool_show(struct device *dev, struct device_attribute *attr, char *buf) { @@ -4732,6 +4741,7 @@ static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL); +static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); @@ -4749,6 +4759,7 @@ static struct attribute *rbd_attrs[] = { &dev_attr_client_addr.attr, &dev_attr_client_id.attr, &dev_attr_cluster_fsid.attr, + &dev_attr_config_info.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, &dev_attr_name.attr, @@ -4824,6 +4835,7 @@ static void rbd_dev_free(struct rbd_device *rbd_dev) ceph_oid_destroy(&rbd_dev->header_oid); ceph_oloc_destroy(&rbd_dev->header_oloc); + kfree(rbd_dev->config_info); rbd_put_client(rbd_dev->rbd_client); rbd_spec_put(rbd_dev->spec); @@ -6223,10 +6235,18 @@ static ssize_t do_rbd_add(struct bus_type *bus, spec = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */ + rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); + if (!rbd_dev->config_info) { + rc = -ENOMEM; + goto err_out_rbd_dev; + } + down_write(&rbd_dev->header_rwsem); rc = rbd_dev_image_probe(rbd_dev, 0); - if (rc < 0) + if (rc < 0) { + up_write(&rbd_dev->header_rwsem); goto err_out_rbd_dev; + } /* If we are mapping a snapshot it must be marked read-only */ @@ -6253,7 +6273,6 @@ out: return rc; err_out_rbd_dev: - up_write(&rbd_dev->header_rwsem); rbd_dev_destroy(rbd_dev); err_out_client: rbd_put_client(rbdc); -- cgit v1.2.3 From 0276dca6c1ecb9a665645ff573e70685a57759af Mon Sep 17 00:00:00 2001 From: Mike Christie Date: Thu, 18 Aug 2016 18:38:45 +0200 Subject: rbd: add force close option This adds a force close option, so we can force the unmapping of a rbd device that is open. If a path/device is blacklisted, apps like multipathd can map a new device and then unmap the old one. The unmapping cleanup would then be handled by the generic hotunplug code paths in multipahd like is done for iSCSI, FC/FCOE, SAS, etc. Signed-off-by: Mike Christie Signed-off-by: Ilya Dryomov --- Documentation/ABI/testing/sysfs-bus-rbd | 10 +++++++--- drivers/block/rbd.c | 35 ++++++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 12 deletions(-) (limited to 'drivers/block') diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 6dccbf82fcf4..f208ac58d613 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -6,7 +6,7 @@ Description: Being used for adding and removing rbd block devices. -Usage: [snap name] +Usage: [] $ echo "192.168.0.1 name=admin rbd foo" > /sys/bus/rbd/add @@ -14,9 +14,13 @@ The snapshot name can be "-" or omitted to map the image read/write. A will be assigned for any registered block device. If snapshot is used, it will be mapped read-only. -Removal of a device: +Usage: [force] - $ echo > /sys/bus/rbd/remove + $ echo 2 > /sys/bus/rbd/remove + +Optional "force" argument which when passed will wait for running requests and +then unmap the image. Requests sent to the driver after initiating the removal +will be failed. (August 2016, since 4.9.) What: /sys/bus/rbd/add_single_major Date: December 2013 diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8ff2dc872008..35fc1da6c83d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -6347,18 +6347,26 @@ static ssize_t do_rbd_remove(struct bus_type *bus, struct rbd_device *rbd_dev = NULL; struct list_head *tmp; int dev_id; - unsigned long ul; + char opt_buf[6]; bool already = false; + bool force = false; int ret; - ret = kstrtoul(buf, 10, &ul); - if (ret) - return ret; - - /* convert to int; abort if we lost anything in the conversion */ - dev_id = (int)ul; - if (dev_id != ul) + dev_id = -1; + opt_buf[0] = '\0'; + sscanf(buf, "%d %5s", &dev_id, opt_buf); + if (dev_id < 0) { + pr_err("dev_id out of range\n"); return -EINVAL; + } + if (opt_buf[0] != '\0') { + if (!strcmp(opt_buf, "force")) { + force = true; + } else { + pr_err("bad remove option at '%s'\n", opt_buf); + return -EINVAL; + } + } ret = -ENOENT; spin_lock(&rbd_dev_list_lock); @@ -6371,7 +6379,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus, } if (!ret) { spin_lock_irq(&rbd_dev->lock); - if (rbd_dev->open_count) + if (rbd_dev->open_count && !force) ret = -EBUSY; else already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, @@ -6382,6 +6390,15 @@ static ssize_t do_rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; + if (force) { + /* + * Prevent new IO from being queued and wait for existing + * IO to complete/fail. + */ + blk_mq_freeze_queue(rbd_dev->disk->queue); + blk_set_queue_dying(rbd_dev->disk->queue); + } + down_write(&rbd_dev->lock_rwsem); if (__rbd_is_lock_owner(rbd_dev)) rbd_unlock(rbd_dev); -- cgit v1.2.3 From 80de19122866d0a65f741e7ff2d5d20842d22d6b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 20 Sep 2016 14:23:17 +0200 Subject: rbd: lock_on_read map option Add a per-device option to acquire exclusive lock on reads (in addition to writes and discards). The use case is iSCSI, where it will be used to prevent execution of stale writes after the implicit failover. Signed-off-by: Ilya Dryomov Tested-by: Mike Christie --- drivers/block/rbd.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 35fc1da6c83d..cc4c9f4c0d97 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -795,6 +795,7 @@ enum { /* string args above */ Opt_read_only, Opt_read_write, + Opt_lock_on_read, Opt_err }; @@ -806,16 +807,19 @@ static match_table_t rbd_opts_tokens = { {Opt_read_only, "ro"}, /* Alternate spelling */ {Opt_read_write, "read_write"}, {Opt_read_write, "rw"}, /* Alternate spelling */ + {Opt_lock_on_read, "lock_on_read"}, {Opt_err, NULL} }; struct rbd_options { int queue_depth; bool read_only; + bool lock_on_read; }; #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ #define RBD_READ_ONLY_DEFAULT false +#define RBD_LOCK_ON_READ_DEFAULT false static int parse_rbd_opts_token(char *c, void *private) { @@ -851,6 +855,9 @@ static int parse_rbd_opts_token(char *c, void *private) case Opt_read_write: rbd_opts->read_only = false; break; + case Opt_lock_on_read: + rbd_opts->lock_on_read = true; + break; default: /* libceph prints "bad option" msg */ return -EINVAL; @@ -4105,7 +4112,7 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; - bool must_be_locked = false; + bool must_be_locked; int result; if (rq->cmd_type != REQ_TYPE_FS) { @@ -4168,6 +4175,9 @@ static void rbd_queue_workfn(struct work_struct *work) snapc = rbd_dev->header.snapc; ceph_get_snap_context(snapc); must_be_locked = rbd_is_lock_supported(rbd_dev); + } else { + must_be_locked = rbd_dev->opts->lock_on_read && + rbd_is_lock_supported(rbd_dev); } up_read(&rbd_dev->header_rwsem); @@ -5757,6 +5767,7 @@ static int rbd_add_parse_args(const char *buf, rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; + rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; copts = ceph_parse_options(options, mon_addrs, mon_addrs + mon_addrs_size - 1, -- cgit v1.2.3 From 980917fc6ec94cb614fd79e6a124689e700f9d97 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 12 Sep 2016 18:59:42 +0200 Subject: rbd: change rbd_obj_request_submit() signature - osdc parameter is useless - starting with commit 5aea3dcd5021 ("libceph: a major OSD client update"), ceph_osdc_start_request() always returns success Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: David Disseldorp --- drivers/block/rbd.c | 70 ++++++++++++++++++----------------------------------- 1 file changed, 23 insertions(+), 47 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cc4c9f4c0d97..5c422402e408 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1618,11 +1618,12 @@ static bool obj_request_type_valid(enum obj_request_type type) } } -static int rbd_obj_request_submit(struct ceph_osd_client *osdc, - struct rbd_obj_request *obj_request) +static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { - dout("%s %p\n", __func__, obj_request); - return ceph_osdc_start_request(osdc, obj_request->osd_req, false); + struct ceph_osd_request *osd_req = obj_request->osd_req; + + dout("%s %p osd_req %p\n", __func__, obj_request, osd_req); + ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } static void rbd_obj_request_end(struct rbd_obj_request *obj_request) @@ -2646,7 +2647,6 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) { struct rbd_obj_request *orig_request; struct ceph_osd_request *osd_req; - struct ceph_osd_client *osdc; struct rbd_device *rbd_dev; struct page **pages; enum obj_operation_type op_type; @@ -2683,13 +2683,9 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) * and re-submit the original write request. */ if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - ceph_release_page_vector(pages, page_count); - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, orig_request); - if (!img_result) - return; + rbd_obj_request_submit(orig_request); + return; } if (img_result) @@ -2723,10 +2719,9 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) /* All set, send it off. */ - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, orig_request); - if (!img_result) - return; + rbd_obj_request_submit(orig_request); + return; + out_err: /* Record the error code and complete the request */ @@ -2860,17 +2855,13 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) /* * If the overlap has become 0 (most likely because the - * image has been flattened) we need to free the pages - * and re-submit the original write request. + * image has been flattened) we need to re-submit the + * original request. */ rbd_dev = orig_request->img_request->rbd_dev; if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - - osdc = &rbd_dev->rbd_client->client->osdc; - result = rbd_obj_request_submit(osdc, orig_request); - if (!result) - return; + rbd_obj_request_submit(orig_request); + return; } /* @@ -2902,7 +2893,6 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) { struct rbd_obj_request *stat_request; struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; struct page **pages = NULL; u32 page_count; size_t size; @@ -2946,8 +2936,9 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) false, false); rbd_osd_req_format_read(stat_request); - osdc = &rbd_dev->rbd_client->client->osdc; - ret = rbd_obj_request_submit(osdc, stat_request); + rbd_obj_request_submit(stat_request); + return 0; + out: if (ret) rbd_obj_request_put(obj_request); @@ -3004,13 +2995,8 @@ static bool img_obj_request_simple(struct rbd_obj_request *obj_request) static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) { if (img_obj_request_simple(obj_request)) { - struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; - - rbd_dev = obj_request->img_request->rbd_dev; - osdc = &rbd_dev->rbd_client->client->osdc; - - return rbd_obj_request_submit(osdc, obj_request); + rbd_obj_request_submit(obj_request); + return 0; } /* @@ -3073,12 +3059,8 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) rbd_assert(obj_request->img_request); rbd_dev = obj_request->img_request->rbd_dev; if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, obj_request); - if (!img_result) - return; + rbd_obj_request_submit(obj_request); + return; } obj_request->result = img_result; @@ -4005,7 +3987,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, void *inbound, size_t inbound_size) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct page **pages; u32 page_count; @@ -4056,9 +4037,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 0, false, false); rbd_osd_req_format_read(obj_request); - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); if (ret) goto out; @@ -4266,7 +4245,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, u64 offset, u64 length, void *buf) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct page **pages = NULL; u32 page_count; @@ -4301,9 +4279,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, false, false); rbd_osd_req_format_read(obj_request); - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); if (ret) goto out; -- cgit v1.2.3 From 058aa9919147da9f088a96982a19ea0864139dc8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 12 Sep 2016 14:44:45 +0200 Subject: rbd: clean up asserts in rbd_img_obj_request_submit() helpers Assert once in rbd_img_obj_request_submit(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: David Disseldorp --- drivers/block/rbd.c | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 5c422402e408..fb7f86d123ea 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2747,21 +2747,14 @@ out_err: */ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = NULL; + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; struct rbd_img_request *parent_request = NULL; - struct rbd_device *rbd_dev; u64 img_offset; u64 length; struct page **pages = NULL; u32 page_count; int result; - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request_type_valid(obj_request->type)); - - img_request = obj_request->img_request; - rbd_assert(img_request != NULL); - rbd_dev = img_request->rbd_dev; rbd_assert(rbd_dev->parent != NULL); /* @@ -2802,10 +2795,11 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); if (result) goto out_err; + parent_request->copyup_pages = pages; parent_request->copyup_page_count = page_count; - parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); if (!result) return 0; @@ -2891,8 +2885,8 @@ out: static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) { + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; struct rbd_obj_request *stat_request; - struct rbd_device *rbd_dev; struct page **pages = NULL; u32 page_count; size_t size; @@ -2923,8 +2917,6 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) stat_request->pages = pages; stat_request->page_count = page_count; - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, stat_request); if (!stat_request->osd_req) @@ -2948,14 +2940,8 @@ out: static bool img_obj_request_simple(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request; - struct rbd_device *rbd_dev; - - rbd_assert(obj_request_img_data_test(obj_request)); - - img_request = obj_request->img_request; - rbd_assert(img_request); - rbd_dev = img_request->rbd_dev; + struct rbd_img_request *img_request = obj_request->img_request; + struct rbd_device *rbd_dev = img_request->rbd_dev; /* Reads */ if (!img_request_write_test(img_request) && @@ -2994,6 +2980,10 @@ static bool img_obj_request_simple(struct rbd_obj_request *obj_request) static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) { + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request_type_valid(obj_request->type)); + rbd_assert(obj_request->img_request); + if (img_obj_request_simple(obj_request)) { rbd_obj_request_submit(obj_request); return 0; -- cgit v1.2.3 From c2e82414884718ad6ec33a7528606cb07cf55cb4 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 13 Sep 2016 20:18:01 +0200 Subject: rbd: mark the original request as done if stat request fails If stat request fails with something other than -ENOENT (which just means that we need to copyup), the original object request is never marked as done and therefore never completed. Fix this by moving the mark done + complete snippet from rbd_img_obj_parent_read_full() into rbd_img_obj_exists_callback(). The former remains covered, as the latter is its only caller (through rbd_img_obj_request_submit()). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: David Disseldorp --- drivers/block/rbd.c | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index fb7f86d123ea..5e55d1c98471 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2742,8 +2742,8 @@ out_err: * When the read completes, this page array will be transferred to * the original object request for the copyup operation. * - * If an error occurs, record it as the result of the original - * object request and mark it done so it gets completed. + * If an error occurs, it is recorded as the result of the original + * object request in rbd_img_obj_exists_callback(). */ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) { @@ -2813,10 +2813,6 @@ out_err: ceph_release_page_vector(pages, page_count); if (parent_request) rbd_img_request_put(parent_request); - obj_request->result = result; - obj_request->xferred = 0; - obj_request_done_set(obj_request); - return result; } @@ -2868,19 +2864,25 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) obj_request_existence_set(orig_request, true); } else if (result == -ENOENT) { obj_request_existence_set(orig_request, false); - } else if (result) { - orig_request->result = result; - goto out; + } else { + goto fail_orig_request; } /* * Resubmit the original request now that we have recorded * whether the target object exists. */ - orig_request->result = rbd_img_obj_request_submit(orig_request); -out: - if (orig_request->result) - rbd_obj_request_complete(orig_request); + result = rbd_img_obj_request_submit(orig_request); + if (result) + goto fail_orig_request; + + return; + +fail_orig_request: + orig_request->result = result; + orig_request->xferred = 0; + obj_request_done_set(orig_request); + rbd_obj_request_complete(orig_request); } static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) -- cgit v1.2.3 From 4a17dadcae55ca1f5c1ed826d42185e22653c256 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 13 Sep 2016 21:08:10 +0200 Subject: rbd: move bumping img_request refcount into rbd_obj_request_submit() Commit 0f2d5be792b0 ("rbd: use reference counts for image requests") added rbd_img_request_get(), which rbd_img_request_fill() calls for each obj_request added to img_request. It was an urgent band-aid for the uglyness that is rbd_img_obj_callback() and none of the error paths were updated. Given that this img_request reference is meant to represent an obj_request that hasn't passed through rbd_img_obj_callback() yet, proper cleanup in appropriate destructors is a challenge. However, noting that if we don't get a chance to call rbd_obj_request_complete(), there is not going to be a call to rbd_img_obj_callback(), we can move rbd_img_request_get() into rbd_obj_request_submit() and fixup the two places that call rbd_obj_request_complete() directly and not through rbd_obj_request_submit() to temporarily bump img_request, so that rbd_img_obj_callback() can put as usual. This takes care of img_request leaks on errors on the submit side. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder --- drivers/block/rbd.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 5e55d1c98471..6db12d9a4291 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1618,11 +1618,17 @@ static bool obj_request_type_valid(enum obj_request_type type) } } +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request); + static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; dout("%s %p osd_req %p\n", __func__, obj_request, osd_req); + if (obj_request_img_data_test(obj_request)) { + WARN_ON(obj_request->callback != rbd_img_obj_callback); + rbd_img_request_get(obj_request->img_request); + } ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } @@ -2588,8 +2594,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); - rbd_img_request_get(img_request); - img_offset += length; resid -= length; } @@ -2723,10 +2727,9 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) return; out_err: - /* Record the error code and complete the request */ - orig_request->result = img_result; orig_request->xferred = 0; + rbd_img_request_get(orig_request->img_request); obj_request_done_set(orig_request); rbd_obj_request_complete(orig_request); } @@ -2881,6 +2884,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) fail_orig_request: orig_request->result = result; orig_request->xferred = 0; + rbd_img_request_get(orig_request->img_request); obj_request_done_set(orig_request); rbd_obj_request_complete(orig_request); } -- cgit v1.2.3 From fa355112c2763d513f1356119684dc8a6150d08a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 16 Sep 2016 15:20:42 +0200 Subject: rbd: don't crash or leak on errors in rbd_img_obj_parent_read_full_callback() - fix parent_length == img_request->xferred assert to not fire on copyup read failures - don't leak pages if copyup read fails or we can't allocate a new osd request Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: David Disseldorp --- drivers/block/rbd.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6db12d9a4291..77675ac8fc4c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2674,7 +2674,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) rbd_assert(obj_request_type_valid(orig_request->type)); img_result = img_request->result; parent_length = img_request->length; - rbd_assert(parent_length == img_request->xferred); + rbd_assert(img_result || parent_length == img_request->xferred); rbd_img_request_put(img_request); rbd_assert(orig_request->img_request); @@ -2727,6 +2727,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) return; out_err: + ceph_release_page_vector(pages, page_count); orig_request->result = img_result; orig_request->xferred = 0; rbd_img_request_get(orig_request->img_request); -- cgit v1.2.3 From 710214e391476f331abed1b774b5f025d054ab7f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 15 Sep 2016 17:53:32 +0200 Subject: rbd: rework rbd_img_obj_exists_submit() error paths - don't put obj_request before rbd_obj_request_get() if rbd_obj_request_create() fails - don't leak pages if rbd_obj_request_create() fails - don't leak stat_request if rbd_osd_req_create() fails Reported-by: David Disseldorp Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: David Disseldorp --- drivers/block/rbd.c | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 77675ac8fc4c..3218ac4f2e09 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2894,11 +2894,23 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) { struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; struct rbd_obj_request *stat_request; - struct page **pages = NULL; + struct page **pages; u32 page_count; size_t size; int ret; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + return -ENOMEM; + + stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, + stat_request); + if (!stat_request->osd_req) { + ret = -ENOMEM; + goto fail_stat_request; + } + /* * The response data for a STAT call consists of: * le64 length; @@ -2910,38 +2922,28 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); page_count = (u32)calc_pages_for(0, size); pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto fail_stat_request; + } - ret = -ENOMEM; - stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, - OBJ_REQUEST_PAGES); - if (!stat_request) - goto out; + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); rbd_obj_request_get(obj_request); stat_request->obj_request = obj_request; stat_request->pages = pages; stat_request->page_count = page_count; - - stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - stat_request); - if (!stat_request->osd_req) - goto out; stat_request->callback = rbd_img_obj_exists_callback; - osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, - false, false); rbd_osd_req_format_read(stat_request); rbd_obj_request_submit(stat_request); return 0; -out: - if (ret) - rbd_obj_request_put(obj_request); - +fail_stat_request: + rbd_obj_request_put(stat_request); return ret; } -- cgit v1.2.3 From 7c84883adf6dc614fc9e01304aa1813a55c43ad2 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 15 Sep 2016 17:56:39 +0200 Subject: rbd: don't call rbd_osd_req_format_read() for !img_data requests Accessing obj_request->img_request union field is only valid for object requests associated with an image (i.e. if obj_request_img_data_test() returns true). rbd_osd_req_format_read() used to do more, but now it just sets osd_req->snap_id. Standalone and stat object requests always go to the HEAD revision and are fine with CEPH_NOSNAP set by libceph, so get around the invalid union field use by simply not calling rbd_osd_req_format_read() in those places. Reported-by: David Disseldorp Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: David Disseldorp --- drivers/block/rbd.c | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3218ac4f2e09..02e9a0f0bf7b 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1951,11 +1951,10 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = obj_request->img_request; struct ceph_osd_request *osd_req = obj_request->osd_req; - if (img_request) - osd_req->r_snapid = img_request->snap_id; + rbd_assert(obj_request_img_data_test(obj_request)); + osd_req->r_snapid = obj_request->img_request->snap_id; } static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) @@ -2937,8 +2936,6 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) stat_request->page_count = page_count; stat_request->callback = rbd_img_obj_exists_callback; - rbd_osd_req_format_read(stat_request); - rbd_obj_request_submit(stat_request); return 0; @@ -4034,7 +4031,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, obj_request->pages, inbound_size, 0, false, false); - rbd_osd_req_format_read(obj_request); rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); @@ -4276,7 +4272,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, obj_request->length, obj_request->offset & ~PAGE_MASK, false, false); - rbd_osd_req_format_read(obj_request); rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); -- cgit v1.2.3 From 04dc923c9e4c43df7d2d94f290189785d3172326 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 15 Sep 2016 18:05:16 +0200 Subject: rbd: img_data requests don't own their page array Move the check into rbd_obj_request_destroy() to avoid use-after-free on errors in rbd_img_request_fill(..., OBJ_REQUEST_PAGES, ...), where pages, owned by the caller, gets freed in rbd_img_request_fill(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: David Disseldorp --- drivers/block/rbd.c | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 02e9a0f0bf7b..e46f4f05fb01 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2147,7 +2147,9 @@ static void rbd_obj_request_destroy(struct kref *kref) bio_chain_put(obj_request->bio_list); break; case OBJ_REQUEST_PAGES: - if (obj_request->pages) + /* img_data requests don't own their page array */ + if (obj_request->pages && + !obj_request_img_data_test(obj_request)) ceph_release_page_vector(obj_request->pages, obj_request->page_count); break; @@ -2368,13 +2370,6 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) xferred = obj_request->length; } - /* Image object requests don't own their page array */ - - if (obj_request->type == OBJ_REQUEST_PAGES) { - obj_request->pages = NULL; - obj_request->page_count = 0; - } - if (img_request_child_test(img_request)) { rbd_assert(img_request->obj_request != NULL); more = obj_request->which < img_request->obj_request_count - 1; -- cgit v1.2.3 From 0dcc685e7dd7190dcaa5435e9c14150f1d405b7b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 26 Sep 2016 15:43:52 +0200 Subject: rbd: add rbd_obj_request_error() helper Pull setting an error and marking a request done code into a new helper. obj_request_img_data_test() check isn't strictly needed right now, but makes it applicable to !img_data requests and a bit safer. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e46f4f05fb01..c842b8911936 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1819,6 +1819,22 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) complete_all(&obj_request->completion); } +static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) +{ + obj_request->result = err; + obj_request->xferred = 0; + /* + * kludge - mirror rbd_obj_request_submit() to match a put in + * rbd_img_obj_callback() + */ + if (obj_request_img_data_test(obj_request)) { + WARN_ON(obj_request->callback != rbd_img_obj_callback); + rbd_img_request_get(obj_request->img_request); + } + obj_request_done_set(obj_request); + rbd_obj_request_complete(obj_request); +} + static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request = NULL; @@ -2722,11 +2738,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) out_err: ceph_release_page_vector(pages, page_count); - orig_request->result = img_result; - orig_request->xferred = 0; - rbd_img_request_get(orig_request->img_request); - obj_request_done_set(orig_request); - rbd_obj_request_complete(orig_request); + rbd_obj_request_error(orig_request, img_result); } /* @@ -2877,11 +2889,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) return; fail_orig_request: - orig_request->result = result; - orig_request->xferred = 0; - rbd_img_request_get(orig_request->img_request); - obj_request_done_set(orig_request); - rbd_obj_request_complete(orig_request); + rbd_obj_request_error(orig_request, result); } static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) -- cgit v1.2.3 From 88a25a5fa09dff62b5fc1e82fb9c0c6b23971887 Mon Sep 17 00:00:00 2001 From: Markus Elfring Date: Sun, 11 Sep 2016 12:21:25 +0200 Subject: rbd: use kmalloc_array() in rbd_header_from_disk() * A multiplication for the size determination of a memory allocation indicated that an array data structure should be processed. Thus use the corresponding function "kmalloc_array". This issue was detected by using the Coccinelle software. * Delete the local variable "size" which became unnecessary with this refactoring. Signed-off-by: Markus Elfring Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'drivers/block') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c842b8911936..aacbc832c1a2 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -986,7 +986,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, char *snap_names = NULL; u64 *snap_sizes = NULL; u32 snap_count; - size_t size; int ret = -ENOMEM; u32 i; @@ -1024,9 +1023,9 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, goto out_err; /* ...as well as the array of their sizes. */ - - size = snap_count * sizeof (*header->snap_sizes); - snap_sizes = kmalloc(size, GFP_KERNEL); + snap_sizes = kmalloc_array(snap_count, + sizeof(*header->snap_sizes), + GFP_KERNEL); if (!snap_sizes) goto out_err; -- cgit v1.2.3