diff options
author | Stephen Rothwell <sfr@canb.auug.org.au> | 2010-05-27 10:53:44 +1000 |
---|---|---|
committer | Stephen Rothwell <sfr@canb.auug.org.au> | 2010-05-27 10:53:44 +1000 |
commit | cbae0ccb2614d03e6365c16201d109014875e319 (patch) | |
tree | 7a87999e631e43fa34261235266d4e6c860818a8 /fs | |
parent | 12061bd19c69def8b81176892044a3c98bc59ede (diff) | |
parent | 3a6e756908487ca0ec1d201c389e823c557de863 (diff) |
Merge remote branch 'ceph/for-next'
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/Kconfig | 10 | ||||
-rw-r--r-- | fs/ceph/Makefile | 2 | ||||
-rw-r--r-- | fs/ceph/README | 1 | ||||
-rw-r--r-- | fs/ceph/debugfs.c | 11 | ||||
-rw-r--r-- | fs/ceph/file.c | 46 | ||||
-rw-r--r-- | fs/ceph/messenger.c | 188 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 3 | ||||
-rw-r--r-- | fs/ceph/mon_client.c | 173 | ||||
-rw-r--r-- | fs/ceph/mon_client.h | 5 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 207 | ||||
-rw-r--r-- | fs/ceph/osd_client.h | 30 | ||||
-rw-r--r-- | fs/ceph/osdmap.c | 13 | ||||
-rw-r--r-- | fs/ceph/osdmap.h | 2 | ||||
-rw-r--r-- | fs/ceph/rbd.c | 1803 | ||||
-rw-r--r-- | fs/ceph/rbd.h | 8 | ||||
-rw-r--r-- | fs/ceph/rbd_types.h | 48 | ||||
-rw-r--r-- | fs/ceph/super.c | 191 | ||||
-rw-r--r-- | fs/ceph/super.h | 36 |
18 files changed, 2624 insertions, 153 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index 04b8280582a9..4c49d18303db 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -25,3 +25,13 @@ config CEPH_FS_PRETTYDEBUG If unsure, say N. +config CEPH_RBD + bool "Rados block device (RBD)" + depends on CEPH_FS + select CONFIG_BLOCK + default y + help + If you say Y here, ceph will include rbd, the RADOS block + device which stripes a block device over objects stored in + the Ceph distributed object store. + diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 6a660e610be8..d2f6326c01dc 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -18,6 +18,8 @@ ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \ auth_x.o \ ceph_fs.o ceph_strings.o ceph_hash.o ceph_frag.o +obj-$(CONFIG_RBD) += rbd.o + else #Otherwise we were called directly from the command # line; invoke the kernel build system. diff --git a/fs/ceph/README b/fs/ceph/README index 18352fab37c0..e2046ec8e54a 100644 --- a/fs/ceph/README +++ b/fs/ceph/README @@ -7,6 +7,7 @@ src/include/ceph_fs.h fs/ceph/ceph_fs.h src/include/ceph_fs.cc fs/ceph/ceph_fs.c src/include/msgr.h fs/ceph/msgr.h src/include/rados.h fs/ceph/rados.h +src/include/rbd_types.h fs/ceph/rbd_types.h src/include/ceph_strings.cc fs/ceph/ceph_strings.c src/include/ceph_frag.h fs/ceph/ceph_frag.h src/include/ceph_frag.cc fs/ceph/ceph_frag.c diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 3be33fb066cc..c5d6a01d1214 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -440,9 +440,14 @@ int ceph_debugfs_client_init(struct ceph_client *client) if (!client->debugfs_congestion_kb) goto out; - sprintf(name, "../../bdi/%s", dev_name(client->sb->s_bdi->dev)); - client->debugfs_bdi = debugfs_create_symlink("bdi", client->debugfs_dir, - name); + if (client->backing_dev_info.dev) { + sprintf(name, "../../bdi/%s", + dev_name(client->backing_dev_info.dev)); + client->debugfs_bdi = + debugfs_create_symlink("bdi", + client->debugfs_dir, + name); + } return 0; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 6512b6701b9e..d7392fdf4fcc 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -363,6 +363,52 @@ static int copy_user_to_page_vector(struct page **pages, return len; } +int ceph_copy_to_page_vector(struct page **pages, + const char *data, + loff_t off, size_t len) +{ + int i = 0; + size_t po = off & ~PAGE_CACHE_MASK; + size_t left = len; + size_t l; + + while (left > 0) { + l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(page_address(pages[i]) + po, data, l); + data += l; + left -= l; + po += l; + if (po == PAGE_CACHE_SIZE) { + po = 0; + i++; + } + } + return len; +} + +int ceph_copy_from_page_vector(struct page **pages, + char *data, + loff_t off, size_t len) +{ + int i = 0; + size_t po = off & ~PAGE_CACHE_MASK; + size_t left = len; + size_t l; + + while (left > 0) { + l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(data, page_address(pages[i]) + po, l); + data += l; + left -= l; + po += l; + if (po == PAGE_CACHE_SIZE) { + po = 0; + i++; + } + } + return len; +} + /* * copy user data from a page vector into a user pointer */ diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 60b74839ebec..2b99e930d2ff 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -9,6 +9,8 @@ #include <linux/slab.h> #include <linux/socket.h> #include <linux/string.h> +#include <linux/bio.h> +#include <linux/blkdev.h> #include <net/tcp.h> #include "super.h" @@ -533,8 +535,11 @@ static void prepare_write_message(struct ceph_connection *con) if (le32_to_cpu(m->hdr.data_len) > 0) { /* initialize page iterator */ con->out_msg_pos.page = 0; - con->out_msg_pos.page_pos = - le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; + if (m->pages) + con->out_msg_pos.page_pos = + le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; + else + con->out_msg_pos.page_pos = 0; con->out_msg_pos.data_pos = 0; con->out_msg_pos.did_page_crc = 0; con->out_more = 1; /* data + footer will follow */ @@ -716,6 +721,31 @@ out: return ret; /* done! */ } +#ifdef CONFIG_BLOCK +static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) +{ + if (!bio) { + *iter = NULL; + *seg = 0; + return; + } + *iter = bio; + *seg = bio->bi_idx; +} + +static void iter_bio_next(struct bio **bio_iter, int *seg) +{ + if (*bio_iter == NULL) + return; + + BUG_ON(*seg >= (*bio_iter)->bi_vcnt); + + (*seg)++; + if (*seg == (*bio_iter)->bi_vcnt) + init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); +} +#endif + /* * Write as much message data payload as we can. If we finish, queue * up the footer. @@ -735,9 +765,16 @@ static int write_partial_msg_pages(struct ceph_connection *con) con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, con->out_msg_pos.page_pos); - while (con->out_msg_pos.page < con->out_msg->nr_pages) { +#ifdef CONFIG_BLOCK + if (msg->bio && !msg->bio_iter) + init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); +#endif + + while (data_len - con->out_msg_pos.data_pos > 0) { struct page *page = NULL; void *kaddr = NULL; + int max_write = PAGE_SIZE; + int page_shift = 0; /* * if we are calculating the data crc (the default), we need @@ -753,13 +790,24 @@ static int write_partial_msg_pages(struct ceph_connection *con) struct page, lru); if (crc) kaddr = kmap(page); +#ifdef CONFIG_BLOCK + } else if (msg->bio) { + struct bio_vec *bv; + + bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); + page = bv->bv_page; + page_shift = bv->bv_offset; + if (crc) + kaddr = kmap(page) + page_shift; + max_write = bv->bv_len; +#endif } else { page = con->msgr->zero_page; if (crc) kaddr = page_address(con->msgr->zero_page); } - len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), - (int)(data_len - con->out_msg_pos.data_pos)); + len = min_t(int, max_write - con->out_msg_pos.page_pos, + data_len - con->out_msg_pos.data_pos); if (crc && !con->out_msg_pos.did_page_crc) { void *base = kaddr + con->out_msg_pos.page_pos; u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); @@ -771,11 +819,12 @@ static int write_partial_msg_pages(struct ceph_connection *con) } ret = kernel_sendpage(con->sock, page, - con->out_msg_pos.page_pos, len, + con->out_msg_pos.page_pos + page_shift, + len, MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE); - if (crc && (msg->pages || msg->pagelist)) + if (crc && (msg->pages || msg->pagelist || msg->bio)) kunmap(page); if (ret <= 0) @@ -790,6 +839,10 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (msg->pagelist) list_move_tail(&page->lru, &msg->pagelist->head); +#ifdef CONFIG_BLOCK + if (msg->bio) + iter_bio_next(&msg->bio_iter, &msg->bio_seg); +#endif } } @@ -1296,8 +1349,7 @@ static int read_partial_message_section(struct ceph_connection *con, struct kvec *section, unsigned int sec_len, u32 *crc) { - int left; - int ret; + int ret, left; BUG_ON(!section); @@ -1320,13 +1372,83 @@ static int read_partial_message_section(struct ceph_connection *con, static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); + + +static int read_partial_message_pages(struct ceph_connection *con, + struct page **pages, + unsigned data_len, int datacrc) +{ + void *p; + int ret; + int left; + + left = min((int)(data_len - con->in_msg_pos.data_pos), + (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); + /* (page) data */ + BUG_ON(pages == NULL); + p = kmap(pages[con->in_msg_pos.page]); + ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, + left); + if (ret > 0 && datacrc) + con->in_data_crc = + crc32c(con->in_data_crc, + p + con->in_msg_pos.page_pos, ret); + kunmap(pages[con->in_msg_pos.page]); + if (ret <= 0) + return ret; + con->in_msg_pos.data_pos += ret; + con->in_msg_pos.page_pos += ret; + if (con->in_msg_pos.page_pos == PAGE_SIZE) { + con->in_msg_pos.page_pos = 0; + con->in_msg_pos.page++; + } + + return ret; +} + +#ifdef CONFIG_BLOCK +static int read_partial_message_bio(struct ceph_connection *con, + struct bio **bio_iter, int *bio_seg, + unsigned data_len, int datacrc) +{ + struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); + void *p; + int ret, left; + + if (IS_ERR(bv)) + return PTR_ERR(bv); + + left = min((int)(data_len - con->in_msg_pos.data_pos), + (int)(bv->bv_len - con->in_msg_pos.page_pos)); + + p = kmap(bv->bv_page) + bv->bv_offset; + + ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, + left); + if (ret > 0 && datacrc) + con->in_data_crc = + crc32c(con->in_data_crc, + p + con->in_msg_pos.page_pos, ret); + kunmap(bv->bv_page); + if (ret <= 0) + return ret; + con->in_msg_pos.data_pos += ret; + con->in_msg_pos.page_pos += ret; + if (con->in_msg_pos.page_pos == bv->bv_len) { + con->in_msg_pos.page_pos = 0; + iter_bio_next(bio_iter, bio_seg); + } + + return ret; +} +#endif + /* * read (part of) a message. */ static int read_partial_message(struct ceph_connection *con) { struct ceph_msg *m = con->in_msg; - void *p; int ret; int to, left; unsigned front_len, middle_len, data_len, data_off; @@ -1411,7 +1533,10 @@ static int read_partial_message(struct ceph_connection *con) m->middle->vec.iov_len = 0; con->in_msg_pos.page = 0; - con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; + if (m->pages) + con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; + else + con->in_msg_pos.page_pos = 0; con->in_msg_pos.data_pos = 0; } @@ -1428,27 +1553,29 @@ static int read_partial_message(struct ceph_connection *con) if (ret <= 0) return ret; } +#ifdef CONFIG_BLOCK + if (m->bio && !m->bio_iter) + init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); +#endif /* (page) data */ while (con->in_msg_pos.data_pos < data_len) { - left = min((int)(data_len - con->in_msg_pos.data_pos), - (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); - BUG_ON(m->pages == NULL); - p = kmap(m->pages[con->in_msg_pos.page]); - ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, - left); - if (ret > 0 && datacrc) - con->in_data_crc = - crc32c(con->in_data_crc, - p + con->in_msg_pos.page_pos, ret); - kunmap(m->pages[con->in_msg_pos.page]); - if (ret <= 0) - return ret; - con->in_msg_pos.data_pos += ret; - con->in_msg_pos.page_pos += ret; - if (con->in_msg_pos.page_pos == PAGE_SIZE) { - con->in_msg_pos.page_pos = 0; - con->in_msg_pos.page++; + if (m->pages) { + ret = read_partial_message_pages(con, m->pages, + data_len, datacrc); + if (ret <= 0) + return ret; +#ifdef CONFIG_BLOCK + } else if (m->bio) { + + ret = read_partial_message_bio(con, + &m->bio_iter, &m->bio_seg, + data_len, datacrc); + if (ret <= 0) + return ret; +#endif + } else { + BUG_ON(1); } } @@ -2124,6 +2251,9 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) m->nr_pages = 0; m->pages = NULL; m->pagelist = NULL; + m->bio = NULL; + m->bio_iter = NULL; + m->bio_seg = 0; dout("ceph_msg_new %p front %d\n", m, front_len); return m; diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index 00a9430b1ffc..9db220a7858d 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -82,6 +82,9 @@ struct ceph_msg { struct ceph_pagelist *pagelist; /* instead of pages */ struct list_head list_head; struct kref kref; + struct bio *bio; /* instead of pages/pagelist */ + struct bio *bio_iter; /* bio iterator */ + int bio_seg; /* current bio segment */ bool front_is_vmalloc; bool more_to_follow; bool needs_out_seq; diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c index f6510a476e7e..5d2a07739275 100644 --- a/fs/ceph/mon_client.c +++ b/fs/ceph/mon_client.c @@ -349,7 +349,7 @@ out: } /* - * statfs + * generic requests (e.g., statfs, poolop) */ static struct ceph_mon_generic_request *__lookup_generic_req( struct ceph_mon_client *monc, u64 tid) @@ -440,6 +440,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, return m; } +static int do_generic_request(struct ceph_mon_client *monc, + struct ceph_mon_generic_request *req) +{ + int err; + + /* register request */ + mutex_lock(&monc->mutex); + req->tid = ++monc->last_tid; + req->request->hdr.tid = cpu_to_le64(req->tid); + __insert_generic_request(monc, req); + monc->num_generic_requests++; + ceph_con_send(monc->con, ceph_msg_get(req->request)); + mutex_unlock(&monc->mutex); + + err = wait_for_completion_interruptible(&req->completion); + + mutex_lock(&monc->mutex); + rb_erase(&req->node, &monc->generic_request_tree); + monc->num_generic_requests--; + mutex_unlock(&monc->mutex); + + if (!err) + err = req->result; + return err; +} + +/* + * statfs + */ static void handle_statfs_reply(struct ceph_mon_client *monc, struct ceph_msg *msg) { @@ -466,7 +495,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, return; bad: - pr_err("corrupt generic reply, no tid\n"); + pr_err("corrupt generic reply, tid %llu\n", tid); ceph_msg_dump(msg); } @@ -485,6 +514,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) kref_init(&req->kref); req->buf = buf; + req->buf_len = sizeof(*buf); init_completion(&req->completion); err = -ENOMEM; @@ -502,33 +532,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) h->monhdr.session_mon_tid = 0; h->fsid = monc->monmap->fsid; - /* register request */ - mutex_lock(&monc->mutex); - req->tid = ++monc->last_tid; - req->request->hdr.tid = cpu_to_le64(req->tid); - __insert_generic_request(monc, req); - monc->num_generic_requests++; - mutex_unlock(&monc->mutex); + err = do_generic_request(monc, req); - /* send request and wait */ - ceph_con_send(monc->con, ceph_msg_get(req->request)); - err = wait_for_completion_interruptible(&req->completion); +out: + kref_put(&req->kref, release_generic_request); + return err; +} + +/* + * pool ops + */ +static int get_poolop_reply_buf(const char *src, size_t src_len, + char *dst, size_t dst_len) +{ + u32 buf_len; + + if (src_len != sizeof(u32) + dst_len) + return -EINVAL; + + buf_len = le32_to_cpu(*(u32 *)src); + if (buf_len != dst_len) + return -EINVAL; + + memcpy(dst, src + sizeof(u32), dst_len); + return 0; +} + +static void handle_poolop_reply(struct ceph_mon_client *monc, + struct ceph_msg *msg) +{ + struct ceph_mon_generic_request *req; + struct ceph_mon_poolop_reply *reply = msg->front.iov_base; + u64 tid = le64_to_cpu(msg->hdr.tid); + + if (msg->front.iov_len < sizeof(*reply)) + goto bad; + dout("handle_poolop_reply %p tid %llu\n", msg, tid); mutex_lock(&monc->mutex); - rb_erase(&req->node, &monc->generic_request_tree); - monc->num_generic_requests--; + req = __lookup_generic_req(monc, tid); + if (req) { + if (req->buf_len && + get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply), + msg->front.iov_len - sizeof(*reply), + req->buf, req->buf_len) < 0) { + mutex_unlock(&monc->mutex); + goto bad; + } + req->result = le32_to_cpu(reply->reply_code); + get_generic_request(req); + } mutex_unlock(&monc->mutex); + if (req) { + complete(&req->completion); + put_generic_request(req); + } + return; - if (!err) - err = req->result; +bad: + pr_err("corrupt generic reply, tid %llu\n", tid); + ceph_msg_dump(msg); +} + +/* + * Do a synchronous pool op. + */ +int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, + u32 pool, u64 snapid, + char *buf, int len) +{ + struct ceph_mon_generic_request *req; + struct ceph_mon_poolop *h; + int err; + + req = kzalloc(sizeof(*req), GFP_NOFS); + if (!req) + return -ENOMEM; + + kref_init(&req->kref); + req->buf = buf; + req->buf_len = len; + init_completion(&req->completion); + + err = -ENOMEM; + req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS); + if (!req->request) + goto out; + req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS); + if (!req->reply) + goto out; + + /* fill out request */ + req->request->hdr.version = cpu_to_le16(2); + h = req->request->front.iov_base; + h->monhdr.have_version = 0; + h->monhdr.session_mon = cpu_to_le16(-1); + h->monhdr.session_mon_tid = 0; + h->fsid = monc->monmap->fsid; + h->pool = cpu_to_le32(pool); + h->op = cpu_to_le32(op); + h->auid = 0; + h->snapid = cpu_to_le64(snapid); + h->name_len = 0; + + err = do_generic_request(monc, req); out: kref_put(&req->kref, release_generic_request); return err; } +int ceph_monc_create_snapid(struct ceph_mon_client *monc, + u32 pool, u64 *snapid) +{ + return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, + pool, 0, (char *)snapid, sizeof(*snapid)); + +} + +int ceph_monc_delete_snapid(struct ceph_mon_client *monc, + u32 pool, u64 snapid) +{ + return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, + pool, snapid, 0, 0); + +} + /* - * Resend pending statfs requests. + * Resend pending generic requests. */ static void __resend_generic_request(struct ceph_mon_client *monc) { @@ -777,12 +908,17 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) handle_statfs_reply(monc, msg); break; + case CEPH_MSG_POOLOP_REPLY: + handle_poolop_reply(monc, msg); + break; + case CEPH_MSG_MON_MAP: ceph_monc_handle_map(monc, msg); break; case CEPH_MSG_MDS_MAP: - ceph_mdsc_handle_map(&monc->client->mdsc, msg); + if (monc->client->have_mdsc) + ceph_mdsc_handle_map(&monc->client->mdsc, msg); break; case CEPH_MSG_OSD_MAP: @@ -814,6 +950,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, case CEPH_MSG_MON_SUBSCRIBE_ACK: m = ceph_msg_get(monc->m_subscribe_ack); break; + case CEPH_MSG_POOLOP_REPLY: case CEPH_MSG_STATFS_REPLY: return get_generic_reply(con, hdr, skip); case CEPH_MSG_AUTH_REPLY: diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h index 174d794321d0..8e396f2c0963 100644 --- a/fs/ceph/mon_client.h +++ b/fs/ceph/mon_client.h @@ -50,6 +50,7 @@ struct ceph_mon_generic_request { struct rb_node node; int result; void *buf; + int buf_len; struct completion completion; struct ceph_msg *request; /* original request */ struct ceph_msg *reply; /* and reply */ @@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc); extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); +extern int ceph_monc_create_snapid(struct ceph_mon_client *monc, + u32 pool, u64 *snapid); +extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc, + u32 pool, u64 snapid); #endif diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index afa7bb3895c4..6535de717230 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -6,6 +6,9 @@ #include <linux/pagemap.h> #include <linux/slab.h> #include <linux/uaccess.h> +#ifdef CONFIG_BLOCK +#include <linux/bio.h> +#endif #include "super.h" #include "osd_client.h" @@ -22,6 +25,35 @@ static int __kick_requests(struct ceph_osd_client *osdc, static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); +void ceph_calc_raw_layout(struct ceph_osd_client *osdc, + struct ceph_file_layout *layout, + u64 snapid, + u64 off, u64 len, u64 *bno, + struct ceph_osd_request *req) +{ + struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; + struct ceph_osd_op *op = (void *)(reqhead + 1); + u64 orig_len = len; + u64 objoff, objlen; /* extent in object */ + + reqhead->snapid = cpu_to_le64(snapid); + + /* object extent? */ + ceph_calc_file_object_mapping(layout, off, &len, bno, + &objoff, &objlen); + if (len < orig_len) + dout(" skipping last %llu, final file extent %llu~%llu\n", + orig_len - len, off, len); + + op->extent.offset = cpu_to_le64(objoff); + op->extent.length = cpu_to_le64(objlen); + req->r_num_pages = calc_pages_for(off, len); + + dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", + *bno, objoff, objlen, req->r_num_pages); + +} + /* * Implement client access to distributed object storage cluster. * @@ -48,34 +80,17 @@ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); * fill osd op in request message. */ static void calc_layout(struct ceph_osd_client *osdc, - struct ceph_vino vino, struct ceph_file_layout *layout, + struct ceph_vino vino, + struct ceph_file_layout *layout, u64 off, u64 *plen, struct ceph_osd_request *req) { - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; - struct ceph_osd_op *op = (void *)(reqhead + 1); - u64 orig_len = *plen; - u64 objoff, objlen; /* extent in object */ u64 bno; - reqhead->snapid = cpu_to_le64(vino.snap); - - /* object extent? */ - ceph_calc_file_object_mapping(layout, off, plen, &bno, - &objoff, &objlen); - if (*plen < orig_len) - dout(" skipping last %llu, final file extent %llu~%llu\n", - orig_len - *plen, off, *plen); + ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req); sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); req->r_oid_len = strlen(req->r_oid); - - op->extent.offset = cpu_to_le64(objoff); - op->extent.length = cpu_to_le64(objlen); - req->r_num_pages = calc_pages_for(off, *plen); - - dout("calc_layout %s (%d) %llu~%llu (%d pages)\n", - req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); } /* @@ -101,6 +116,10 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_own_pages) ceph_release_page_vector(req->r_pages, req->r_num_pages); +#ifdef CONFIG_BLOCK + if (req->r_bio) + bio_put(req->r_bio); +#endif ceph_put_snap_context(req->r_snapc); if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); @@ -108,43 +127,35 @@ void ceph_osdc_release_request(struct kref *kref) kfree(req); } -/* - * build new request AND message, calculate layout, and adjust file - * extent as needed. - * - * if the file was recently truncated, we include information about its - * old and new size so that the object can be updated appropriately. (we - * avoid synchronously deleting truncated objects because it's slow.) - * - * if @do_sync, include a 'startsync' command so that the osd will flush - * data quickly. - */ -struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, - struct ceph_file_layout *layout, - struct ceph_vino vino, - u64 off, u64 *plen, - int opcode, int flags, +struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, + int flags, struct ceph_snap_context *snapc, int do_sync, - u32 truncate_seq, - u64 truncate_size, - struct timespec *mtime, - bool use_mempool, int num_reply) + bool use_mempool, + gfp_t gfp_flags, + struct page **pages, + struct bio *bio) { struct ceph_osd_request *req; struct ceph_msg *msg; - struct ceph_osd_request_head *head; - struct ceph_osd_op *op; - void *p; int num_op = 1 + do_sync; - size_t msg_size = sizeof(*head) + num_op*sizeof(*op); - int i; + size_t msg_size = sizeof(struct ceph_osd_request_head) + + num_op*sizeof(struct ceph_osd_op); + + if (use_mempool) { + req = mempool_alloc(osdc->req_mempool, gfp_flags); + memset(req, 0, sizeof(*req)); + } else { + req = kzalloc(sizeof(*req), gfp_flags); + } + if (!req) + return NULL; if (use_mempool) { - req = mempool_alloc(osdc->req_mempool, GFP_NOFS); + req = mempool_alloc(osdc->req_mempool, gfp_flags); memset(req, 0, sizeof(*req)); } else { - req = kzalloc(sizeof(*req), GFP_NOFS); + req = kzalloc(sizeof(*req), gfp_flags); } if (req == NULL) return NULL; @@ -164,7 +175,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, - OSD_OPREPLY_FRONT_LEN, GFP_NOFS); + OSD_OPREPLY_FRONT_LEN, gfp_flags); if (!msg) { ceph_osdc_put_request(req); return NULL; @@ -178,18 +189,54 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags); if (!msg) { ceph_osdc_put_request(req); return NULL; } msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); memset(msg->front.iov_base, 0, msg->front.iov_len); + + req->r_request = msg; + req->r_pages = pages; +#ifdef CONFIG_BLOCK + if (bio) { + req->r_bio = bio; + bio_get(req->r_bio); + } +#endif + + return req; +} + +/* + * build new request AND message + * + */ +void ceph_osdc_build_request(struct ceph_osd_request *req, + u64 off, u64 *plen, + int opcode, + struct ceph_snap_context *snapc, + int do_sync, + u32 truncate_seq, + u64 truncate_size, + struct timespec *mtime, + const char *oid, + int oid_len) +{ + struct ceph_msg *msg = req->r_request; + struct ceph_osd_request_head *head; + struct ceph_osd_op *op; + void *p; + int num_op = 1 + do_sync; + size_t msg_size = sizeof(*head) + num_op*sizeof(*op); + int i; + int flags = req->r_flags; + head = msg->front.iov_base; op = (void *)(head + 1); p = (void *)(op + num_op); - req->r_request = msg; req->r_snapc = ceph_get_snap_context(snapc); head->client_inc = cpu_to_le32(1); /* always, for now. */ @@ -199,10 +246,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, head->num_ops = cpu_to_le16(num_op); op->op = cpu_to_le16(opcode); - /* calculate max write size */ - calc_layout(osdc, vino, layout, off, plen, req); - req->r_file_layout = *layout; /* keep a copy */ - if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); req->r_request->hdr.data_len = cpu_to_le32(*plen); @@ -212,9 +255,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, op->extent.truncate_seq = cpu_to_le32(truncate_seq); /* fill in oid */ - head->object_len = cpu_to_le32(req->r_oid_len); - memcpy(p, req->r_oid, req->r_oid_len); - p += req->r_oid_len; + head->object_len = cpu_to_le32(oid_len); + memcpy(p, oid, oid_len); + p += oid_len; if (do_sync) { op++; @@ -233,6 +276,50 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, msg_size = p - msg->front.iov_base; msg->front.iov_len = msg_size; msg->hdr.front_len = cpu_to_le32(msg_size); + return; +} + +/* + * build new request AND message, calculate layout, and adjust file + * extent as needed. + * + * if the file was recently truncated, we include information about its + * old and new size so that the object can be updated appropriately. (we + * avoid synchronously deleting truncated objects because it's slow.) + * + * if @do_sync, include a 'startsync' command so that the osd will flush + * data quickly. + */ +struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, + struct ceph_file_layout *layout, + struct ceph_vino vino, + u64 off, u64 *plen, + int opcode, int flags, + struct ceph_snap_context *snapc, + int do_sync, + u32 truncate_seq, + u64 truncate_size, + struct timespec *mtime, + bool use_mempool, int num_reply) +{ + struct ceph_osd_request *req = + ceph_osdc_alloc_request(osdc, flags, + snapc, do_sync, + use_mempool, + GFP_NOFS, NULL, NULL); + if (IS_ERR(req)) + return req; + + /* calculate max write size */ + calc_layout(osdc, vino, layout, off, plen, req); + req->r_file_layout = *layout; /* keep a copy */ + + ceph_osdc_build_request(req, off, plen, opcode, + snapc, do_sync, + truncate_seq, truncate_size, + mtime, + req->r_oid, req->r_oid_len); + return req; } @@ -1099,6 +1186,9 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, req->r_request->pages = req->r_pages; req->r_request->nr_pages = req->r_num_pages; +#ifdef CONFIG_BLOCK + req->r_request->bio = req->r_bio; +#endif register_request(osdc, req); @@ -1417,6 +1507,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, } m->pages = req->r_pages; m->nr_pages = req->r_num_pages; +#ifdef CONFIG_BLOCK + m->bio = req->r_bio; +#endif } *skip = 0; req->r_con_filling_msg = ceph_con_get(con); diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index ce776989ef6a..2daa3fcae63e 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h @@ -68,6 +68,7 @@ struct ceph_osd_request { struct list_head r_unsafe_item; struct inode *r_inode; /* for use by callbacks */ + void *r_priv; /* ditto */ char r_oid[40]; /* object name */ int r_oid_len; @@ -80,6 +81,9 @@ struct ceph_osd_request { struct page **r_pages; /* pages for data payload */ int r_pages_from_pool; int r_own_pages; /* if true, i own page list */ +#ifdef CONFIG_BLOCK + struct bio *r_bio; /* instead of pages */ +#endif }; struct ceph_osd_client { @@ -119,6 +123,32 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); +extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc, + struct ceph_file_layout *layout, + u64 snapid, + u64 off, u64 len, u64 *bno, + struct ceph_osd_request *req); + +extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, + int flags, + struct ceph_snap_context *snapc, + int do_sync, + bool use_mempool, + gfp_t gfp_flags, + struct page **pages, + struct bio *bio); + +extern void ceph_osdc_build_request(struct ceph_osd_request *req, + u64 off, u64 *plen, + int opcode, + struct ceph_snap_context *snapc, + int do_sync, + u32 truncate_seq, + u64 truncate_size, + struct timespec *mtime, + const char *oid, + int oid_len); + extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_file_layout *layout, struct ceph_vino vino, diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c index cfdd8f4388b7..97463b8e6ae6 100644 --- a/fs/ceph/osdmap.c +++ b/fs/ceph/osdmap.c @@ -417,6 +417,19 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) return NULL; } +int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) +{ + struct rb_node *rbp; + + for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) { + struct ceph_pg_pool_info *pi = + rb_entry(rbp, struct ceph_pg_pool_info, node); + if (pi->name && strcmp(pi->name, name) == 0) + return pi->id; + } + return -ENOENT; +} + static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) { rb_erase(&pi->node, root); diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h index 970b547e510d..a592b211be39 100644 --- a/fs/ceph/osdmap.h +++ b/fs/ceph/osdmap.h @@ -125,4 +125,6 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid); +extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); + #endif diff --git a/fs/ceph/rbd.c b/fs/ceph/rbd.c new file mode 100644 index 000000000000..a745f3f3481c --- /dev/null +++ b/fs/ceph/rbd.c @@ -0,0 +1,1803 @@ +/* + rbd.c -- Export ceph rados objects as a Linux block device + + + based on drivers/block/osdblk.c: + + Copyright 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to + the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + + + + Instructions for use + -------------------- + + 1) Map a Linux block device to an existing rbd image. + + Usage: <mon ip addr> <options> <pool name> <rbd image name> + + $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add + + + 2) List all active blkdev<->object mappings. + + In this example, we have performed step #1 twice, creating two blkdevs, + mapped to two separate rados objects in the rados rbd pool + + $ cat /sys/class/rbd/list + 0 254 rbd foo + 1 253 rbd bar + + The columns, in order, are: + - blkdev unique id + - blkdev assigned major + - rados pool name + - rados block device name + + + 3) Remove an active blkdev<->rbd image mapping. + + In this example, we remove the mapping with blkdev unique id 1. + + $ echo 1 > /sys/class/rbd/remove + + + NOTE: The actual creation and deletion of rados objects is outside the scope + of this driver. + + */ + +#include "super.h" +#include "osd_client.h" +#include "rbd_types.h" +#include "mon_client.h" + +#include <linux/kernel.h> +#include <linux/device.h> +#include <linux/module.h> +#include <linux/fs.h> +#include <linux/blkdev.h> + +#define DRV_NAME "rbd" +#define DRV_NAME_LONG "rbd (rados block device)" + +enum { + RBD_MINORS_PER_MAJOR = 256, /* max minors per blkdev */ +}; + +#define RBD_MAX_POOL_NAME_SIZE 64 + +#define RBD_STRIPE_UNIT (1 << 22) + +#define RBD_MAX_OPT_LEN 1024 +#define RBD_MAX_SNAP_NAME_LEN 32 + +#define RBD_SNAP_OP_CREATE 0x1 +#define RBD_SNAP_OP_SET 0x2 + +#define RBD_SNAP_HEAD_NAME "head" + +struct rbd_obj_header { + u64 image_size; + __u8 obj_order; + __u8 crypt_type; + __u8 comp_type; + struct rw_semaphore snap_rwsem; + struct ceph_snap_context *snapc; + size_t snap_names_len; + u32 snap_seq; + u32 total_snaps; + + char *snap_names; + u64 *snap_sizes; +}; + +struct rbd_request { + struct request *rq; /* blk layer request */ + struct bio *bio; /* cloned bio */ + struct page **pages; /* list of used pages */ + u64 len; +}; + +struct rbd_client_node { + struct ceph_client *client; + const char *opt; + struct kref kref; + struct list_head node; +}; + +#define DEV_NAME_LEN 32 + +struct rbd_device { + int id; /* blkdev unique id */ + + int major; /* blkdev assigned major */ + struct gendisk *disk; /* blkdev's gendisk and rq */ + struct request_queue *q; + + struct ceph_client *client; /* associated OSD */ + + char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ + + spinlock_t lock; /* queue lock */ + + struct rbd_obj_header header; + char obj[RBD_MAX_OBJ_NAME_SIZE]; /* rbd image name */ + int obj_len; + char pool_name[RBD_MAX_POOL_NAME_SIZE]; + int poolid; + + u32 cur_snap; /* index+1 of current snapshot within snap context + 0 - for the head */ + int read_only; + + struct list_head node; + struct rbd_client_node *client_node; +}; + +static spinlock_t node_lock; /* protects client get/put */ + +static struct class *class_rbd; /* /sys/class/rbd */ +static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ +static LIST_HEAD(rbddev_list); +static LIST_HEAD(node_list); + + +static int rbd_open(struct block_device *bdev, fmode_t mode) +{ + struct gendisk *disk = bdev->bd_disk; + struct rbd_device *rbd_dev = disk->private_data; + + if (mode & FMODE_WRITE && rbd_dev->read_only) + return -EROFS; + + return 0; +} + +static const struct block_device_operations rbd_bd_ops = { + .owner = THIS_MODULE, + .open = rbd_open, +}; + +/* + * Initialize ceph client for a specific device. + */ +static int rbd_init_client(struct rbd_device *rbd_dev, + struct ceph_mount_args *args) +{ + struct ceph_osd_client *osdc; + int ret; + dout("rbd_init_device\n"); + rbd_dev->client = ceph_create_client(args, 0); + if (IS_ERR(rbd_dev->client)) + return PTR_ERR(rbd_dev->client); + + ret = ceph_open_session(rbd_dev->client); + if (ret < 0) + goto done_err; + + osdc = &rbd_dev->client->osdc; + ret = ceph_pg_poolid_by_name(osdc->osdmap, + rbd_dev->pool_name); + if (ret < 0) + goto done_err; + + rbd_dev->poolid = ret; + return 0; + +done_err: + ceph_destroy_client(rbd_dev->client); + rbd_dev->client = NULL; + return ret; +} + +/* + * Find a ceph client with specific addr and configuration. + */ +static struct rbd_client_node *__get_client_node(struct ceph_mount_args *args) +{ + struct rbd_client_node *client_node; + + if (args->flags & CEPH_OPT_NOSHARE) + return NULL; + + list_for_each_entry(client_node, &node_list, node) + if (ceph_compare_mount_args(args, client_node->client) == 0) + return client_node; + return NULL; +} + +/* + * Get a ceph client with specific addr and configuration, if one does + * not exist create it. + */ +static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, + char *opt) +{ + struct rbd_client_node *client_node; + struct ceph_mount_args *args; + int ret; + + + args = parse_mount_args(0, opt, mon_addr, NULL); + if (IS_ERR(args)) + return PTR_ERR(args); + + spin_lock(&node_lock); + + client_node = __get_client_node(args); + if (client_node) { + ceph_destroy_mount_args(args); + + kref_get(&client_node->kref); + rbd_dev->client_node = client_node; + rbd_dev->client = client_node->client; + spin_unlock(&node_lock); + return 0; + } + + spin_unlock(&node_lock); + + ret = -ENOMEM; + client_node = kmalloc(sizeof(struct rbd_client_node), GFP_KERNEL); + if (!client_node) + goto out_args; + + ret = rbd_init_client(rbd_dev, args); + if (ret < 0) + goto out_free; + + client_node->client = rbd_dev->client; + client_node->opt = kstrdup(opt, GFP_KERNEL); + kref_init(&client_node->kref); + INIT_LIST_HEAD(&client_node->node); + + rbd_dev->client_node = client_node; + + spin_lock(&node_lock); + list_add_tail(&client_node->node, &node_list); + spin_unlock(&node_lock); + + return 0; + +out_free: + kfree(client_node); +out_args: + ceph_destroy_mount_args(args); + return ret; +} + +/* + * Destroy ceph client + */ +static void rbd_release_client(struct kref *kref) +{ + struct rbd_client_node *node = + container_of(kref, struct rbd_client_node, kref); + + dout("rbd_release_client\n"); + + spin_lock(&node_lock); + list_del(&node->node); + spin_unlock(&node_lock); + + ceph_destroy_client(node->client); + kfree(node->opt); + kfree(node); +} + +/* + * Drop reference to ceph client node. If it's not referenced anymore, release + * it. + */ +static void rbd_put_client(struct rbd_device *rbd_dev) +{ + if (!rbd_dev->client_node) + return; + + kref_put(&rbd_dev->client_node->kref, rbd_release_client); + rbd_dev->client_node = NULL; +} + +static int snap_index(struct rbd_obj_header *header, int snap_num) +{ + return header->total_snaps - snap_num; +} + +static u64 cur_snap_id(struct rbd_device *rbd_dev) +{ + struct rbd_obj_header *header = &rbd_dev->header; + + if (!rbd_dev->cur_snap) + return 0; + + return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)]; +} + + +/* + * Create a new header structure, translate header format from the on-disk + * header. + */ +static int rbd_header_from_disk(struct rbd_obj_header *header, + struct rbd_obj_header_ondisk *ondisk, + int allocated_snaps, + gfp_t gfp_flags) +{ + int i; + u32 snap_count = le32_to_cpu(ondisk->snap_count); + int ret = -ENOMEM; + + init_rwsem(&header->snap_rwsem); + + header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); + header->snapc = kmalloc(sizeof(struct ceph_snap_context) + + snap_count * + sizeof(struct rbd_obj_snap_ondisk), + gfp_flags); + if (!header->snapc) + return -ENOMEM; + if (snap_count) { + header->snap_names = kmalloc(header->snap_names_len, + GFP_KERNEL); + if (!header->snap_names) + goto err_snapc; + header->snap_sizes = kmalloc(snap_count * sizeof(u64), + GFP_KERNEL); + if (!header->snap_sizes) + goto err_names; + } else { + header->snap_names = NULL; + header->snap_sizes = NULL; + } + + header->image_size = le64_to_cpu(ondisk->image_size); + header->obj_order = ondisk->obj_order; + header->crypt_type = ondisk->crypt_type; + header->comp_type = ondisk->comp_type; + + atomic_set(&header->snapc->nref, 1); + header->snap_seq = le32_to_cpu(ondisk->snap_seq); + header->snapc->num_snaps = snap_count; + header->total_snaps = snap_count; + + if (snap_count && + allocated_snaps == snap_count) { + for (i = 0; i < snap_count; i++) { + header->snapc->snaps[i] = + le64_to_cpu(ondisk->snaps[i].id); + header->snap_sizes[i] = + le64_to_cpu(ondisk->snaps[i].image_size); + } + + /* copy snapshot names */ + memcpy(header->snap_names, &ondisk->snaps[i], + header->snap_names_len); + } + + return 0; + +err_names: + kfree(header->snap_names); +err_snapc: + kfree(header->snapc); + return ret; +} + +/* + * Create a new header structure, translate header format from the on-disk + * header. + */ +static int rbd_header_to_disk(struct rbd_obj_header_ondisk **ondisk, + struct rbd_obj_header_ondisk *old_ondisk, + struct rbd_obj_header *header, + gfp_t gfp_flags) +{ + int i; + + down_read(&header->snap_rwsem); + *ondisk = kmalloc(sizeof(struct rbd_obj_header_ondisk) + + header->snap_names_len + + header->total_snaps * + sizeof(struct rbd_obj_snap_ondisk), + gfp_flags); + if (!*ondisk) + return -ENOMEM; + + memcpy(*ondisk, old_ondisk, sizeof(*old_ondisk)); + + (*ondisk)->snap_seq = cpu_to_le32(header->snap_seq); + (*ondisk)->snap_count = cpu_to_le32(header->total_snaps); + (*ondisk)->snap_names_len = cpu_to_le64(header->snap_names_len); + + if (header->total_snaps) { + for (i = 0; i < header->total_snaps; i++) { + (*ondisk)->snaps[i].id = + cpu_to_le64(header->snapc->snaps[i]); + (*ondisk)->snaps[i].image_size = + cpu_to_le64(header->snap_sizes[i]); + } + + /* copy snapshot names */ + memcpy(&(*ondisk)->snaps[i], header->snap_names, + header->snap_names_len); + } + up_read(&header->snap_rwsem); + + return 0; +} + +static int rbd_header_add_snap(struct rbd_device *dev, + const char *snap_name, + gfp_t gfp_flags) +{ + struct rbd_obj_header *header = &dev->header; + struct ceph_snap_context *new_snapc; + char *p; + int name_len = strlen(snap_name); + u64 *snaps = header->snapc->snaps; + u64 *new_sizes; + char *new_names; + u64 new_snapid; + int i; + int ret = -EINVAL; + + down_write(&header->snap_rwsem); + + /* we can create a snapshot only if we're pointing at the head */ + if (dev->cur_snap) + goto done; + + ret = -EEXIST; + p = header->snap_names; + for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { + if (strcmp(snap_name, p) == 0) + goto done; + } + + ret = -ENOMEM; + new_snapc = kmalloc(sizeof(struct rbd_obj_header) + + (header->total_snaps + 1) * sizeof(u64), + gfp_flags); + if (!new_snapc) + goto done; + new_names = kmalloc(header->snap_names_len + name_len + 1, gfp_flags); + if (!new_names) + goto err_snapc; + new_sizes = kmalloc((header->total_snaps + 1) * sizeof(u64), + gfp_flags); + if (!new_sizes) + goto err_names; + + atomic_set(&new_snapc->nref, 1); + new_snapc->num_snaps = header->total_snaps + 1; + if (header->total_snaps) + memcpy(&new_snapc->snaps[1], snaps, + (header->total_snaps) * sizeof(u64)); + + ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid, + &new_snapid); + dout("created snapid=%lld\n", new_snapid); + if (ret < 0) + goto err_sizes; + + new_snapc->seq = new_snapid; /* we're still pointing at the head */ + header->snap_seq = new_snapid; + new_snapc->snaps[0] = new_snapid; + + /* copy snap names */ + if (header->snap_names) + memcpy(new_names + name_len + 1, header->snap_names, + header->snap_names_len); + + memcpy(new_names, snap_name, name_len + 1); + header->snap_names_len += name_len + 1; + + /* copy snap image sizes */ + if (header->snap_sizes) + memcpy(new_sizes, header->snap_sizes, + header->total_snaps * sizeof(u64)); + new_sizes[new_snapc->num_snaps - 1] = header->image_size; + + header->total_snaps = new_snapc->num_snaps; + + kfree(header->snapc); + header->snapc = new_snapc; + kfree(header->snap_names); + header->snap_names = new_names; + kfree(header->snap_sizes); + header->snap_sizes = new_sizes; + + ret = 0; +done: + up_write(&header->snap_rwsem); + return ret; +err_sizes: + kfree(new_sizes); +err_names: + kfree(new_names); +err_snapc: + kfree(new_snapc); + up_write(&header->snap_rwsem); + return ret; +} + +static int rbd_header_set_snap(struct rbd_device *dev, + const char *snap_name, + u64 *size) +{ + struct rbd_obj_header *header = &dev->header; + struct ceph_snap_context *snapc = header->snapc; + char *p; + int i; + int ret = -ENOENT; + + down_write(&header->snap_rwsem); + + if (!snap_name || + !*snap_name || + strcmp(snap_name, "-") == 0 || + strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) { + if (header->total_snaps) + snapc->seq = header->snap_seq; + else + snapc->seq = 0; + dev->cur_snap = 0; + dev->read_only = 0; + if (size) + *size = header->image_size; + } else { + p = header->snap_names; + for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { + if (strcmp(snap_name, p) == 0) + break; + } + if (i == header->total_snaps) + goto done; + + snapc->seq = snapc->snaps[i]; + dev->cur_snap = header->total_snaps - i; + dev->read_only = 1; + if (size) + *size = header->snap_sizes[i]; + } + + ret = 0; +done: + up_write(&header->snap_rwsem); + return ret; +} + +static void rbd_header_free(struct rbd_obj_header *header) +{ + kfree(header->snapc); + kfree(header->snap_names); + kfree(header->snap_sizes); +} + +/* + * get the actual striped segment name, offset and length + */ +static u64 rbd_get_segment(struct rbd_obj_header *header, + const char *obj_name, + u64 ofs, u64 len, + char *seg_name, u64 *segofs) +{ + u64 seg = ofs >> header->obj_order; + + if (seg_name) + snprintf(seg_name, RBD_MAX_SEG_NAME_SIZE, + "%s.%012llx", obj_name, seg); + + ofs = ofs & ((1 << header->obj_order) - 1); + len = min_t(u64, len, (1 << header->obj_order) - ofs); + + if (segofs) + *segofs = ofs; + + return len; +} + +static void bio_chain_put(struct bio *chain) +{ + struct bio *tmp; + + while (chain) { + tmp = chain; + chain = chain->bi_next; + + bio_put(tmp); + } +} + +/* + * zeros a bio chain, starting at specific offset + */ +static void zero_bio_chain(struct bio *chain, int start_ofs) +{ + struct bio_vec *bv; + unsigned long flags; + void *buf; + int i; + int pos = 0; + + while (chain) { + bio_for_each_segment(bv, chain, i) { + if (pos + bv->bv_len > start_ofs) { + int remainder = max(start_ofs - pos, 0); + buf = bvec_kmap_irq(bv, &flags); + memset(buf + remainder, 0, + bv->bv_len - remainder); + bvec_kunmap_irq(bv, &flags); + } + pos += bv->bv_len; + } + + chain = chain->bi_next; + } +} + +/* + * bio_chain_clone - clone a chain of bios up to a certain length. + * might return a bio_pair that will need to be released. + */ +static struct bio *bio_chain_clone(struct bio **old, struct bio **next, + struct bio_pair **bp, + int len, gfp_t gfpmask) +{ + struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL; + int total = 0; + + if (*bp) { + bio_pair_release(*bp); + *bp = NULL; + } + + while (old_chain && (total < len)) { + tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); + if (!tmp) + goto err_out; + + if (total + old_chain->bi_size > len) { + struct bio_pair *bp; + + /* + * this split can only happen with a single paged bio, + * split_bio will BUG_ON if this is not the case + */ + dout("bio_chain_clone split! total=%d remaining=%d" + "bi_size=%d\n", + (int)total, (int)len-total, + (int)old_chain->bi_size); + + /* split the bio. We'll release it either in the next + call, or it will have to be released outside */ + bp = bio_split(old_chain, (len - total) / 512ULL); + if (!bp) + goto err_out; + + __bio_clone(tmp, &bp->bio1); + + *next = &bp->bio2; + } else { + __bio_clone(tmp, old_chain); + *next = old_chain->bi_next; + } + + tmp->bi_bdev = NULL; + gfpmask &= ~__GFP_WAIT; + tmp->bi_next = NULL; + + if (!new_chain) { + new_chain = tail = tmp; + } else { + tail->bi_next = tmp; + tail = tmp; + } + old_chain = old_chain->bi_next; + + total += tmp->bi_size; + } + + BUG_ON(total < len); + + if (tail) + tail->bi_next = NULL; + + *old = old_chain; + + return new_chain; + +err_out: + dout("bio_chain_clone with err\n"); + bio_chain_put(new_chain); + return NULL; +} + +/* + * Send ceph osd request + */ +static int rbd_do_request(struct request *rq, + struct rbd_device *dev, + struct ceph_snap_context *snapc, + u64 snapid, + const char *obj, u64 ofs, u64 len, + struct bio *bio, + struct page **pages, + int num_pages, + int opcode, int flags, + int num_reply, + void (*rbd_cb)(struct ceph_osd_request *req, + struct ceph_msg *msg)) +{ + struct ceph_osd_request *req; + struct ceph_file_layout *layout; + int ret; + u64 bno; + struct timespec mtime = CURRENT_TIME; + struct rbd_request *req_data; + struct ceph_osd_request_head *reqhead; + struct rbd_obj_header *header = &dev->header; + + ret = -ENOMEM; + req_data = kzalloc(sizeof(*req_data), GFP_NOFS); + if (!req_data) + goto done; + + dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs); + + down_read(&header->snap_rwsem); + + req = ceph_osdc_alloc_request(&dev->client->osdc, flags, + snapc, 0, + false, + GFP_NOFS, pages, bio); + if (IS_ERR(req)) { + up_read(&header->snap_rwsem); + ret = PTR_ERR(req); + goto done_pages; + } + + req->r_callback = rbd_cb; + + req_data->rq = rq; + req_data->bio = bio; + req_data->pages = pages; + req_data->len = len; + + req->r_priv = req_data; + + reqhead = req->r_request->front.iov_base; + reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); + + strncpy(req->r_oid, obj, sizeof(req->r_oid)); + req->r_oid_len = strlen(req->r_oid); + + layout = &req->r_file_layout; + memset(layout, 0, sizeof(*layout)); + layout->fl_stripe_unit = RBD_STRIPE_UNIT; + layout->fl_stripe_count = 1; + layout->fl_object_size = RBD_STRIPE_UNIT; + layout->fl_pg_preferred = -1; + layout->fl_pg_pool = dev->poolid; + ceph_calc_raw_layout(&dev->client->osdc, layout, snapid, + ofs, len, &bno, req); + + ceph_osdc_build_request(req, ofs, &len, opcode, + snapc, 0, + 0, 0, + &mtime, + req->r_oid, req->r_oid_len); + up_read(&header->snap_rwsem); + + ret = ceph_osdc_start_request(&dev->client->osdc, req, false); + if (ret < 0) + goto done_err; + + if (!rbd_cb) { + ret = ceph_osdc_wait_request(&dev->client->osdc, req); + ceph_osdc_put_request(req); + } + return ret; + +done_err: + bio_chain_put(req_data->bio); + ceph_osdc_put_request(req); +done_pages: + kfree(req_data); +done: + if (rq) + blk_end_request(rq, ret, len); + return ret; +} + +/* + * Ceph osd op callback + */ +static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) +{ + struct rbd_request *req_data = req->r_priv; + struct ceph_osd_reply_head *replyhead; + struct ceph_osd_op *op; + __s32 rc; + u64 bytes; + int read_op; + + /* parse reply */ + replyhead = msg->front.iov_base; + WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); + op = (void *)(replyhead + 1); + rc = le32_to_cpu(replyhead->result); + bytes = le64_to_cpu(op->extent.length); + + dout("rbd_req_cb bytes=%lld rc=%d\n", bytes, rc); + + read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ); + + if (rc == -ENOENT && read_op) { + zero_bio_chain(req_data->bio, 0); + rc = 0; + } else if (rc == 0 && read_op && bytes < req_data->len) { + zero_bio_chain(req_data->bio, bytes); + bytes = req_data->len; + } + + blk_end_request(req_data->rq, rc, bytes); + + if (req_data->bio) + bio_chain_put(req_data->bio); + + ceph_osdc_put_request(req); + kfree(req_data); +} + +/* + * Do a synchronous ceph osd operation + */ +static int rbd_req_sync_op(struct rbd_device *dev, + struct ceph_snap_context *snapc, + u64 snapid, + int opcode, int flags, + int num_reply, + const char *obj, + u64 ofs, u64 len, + char *buf) +{ + int ret; + struct page **pages; + int num_pages; + + num_pages = calc_pages_for(ofs , len); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (!pages) + return -ENOMEM; + + if (flags & CEPH_OSD_FLAG_WRITE) { + ret = ceph_copy_to_page_vector(pages, buf, ofs, len); + if (ret < 0) + goto done; + } + + ret = rbd_do_request(NULL, dev, snapc, snapid, + obj, ofs, len, NULL, + pages, num_pages, + opcode, + flags, + 2, + NULL); + if (ret < 0) + goto done; + + if (flags & CEPH_OSD_FLAG_READ) + ret = ceph_copy_from_page_vector(pages, buf, ofs, ret); + +done: + ceph_release_page_vector(pages, num_pages); + return ret; +} + +/* + * Do an asynchronous ceph osd operation + */ +static int rbd_do_op(struct request *rq, + struct rbd_device *rbd_dev , + struct ceph_snap_context *snapc, + u64 snapid, + int opcode, int flags, int num_reply, + u64 ofs, u64 len, + struct bio *bio) +{ + char *seg_name; + u64 seg_ofs; + u64 seg_len; + int ret; + + seg_name = kmalloc(RBD_MAX_SEG_NAME_SIZE + 1, GFP_NOIO); + if (!seg_name) + return -ENOMEM; + + seg_len = rbd_get_segment(&rbd_dev->header, + rbd_dev->obj, + ofs, len, + seg_name, &seg_ofs); + if (seg_len < 0) + return seg_len; + + /* we've taken care of segment sizes earlier when we + cloned the bios. We should never have a segment + truncated at this point */ + BUG_ON(seg_len < len); + + ret = rbd_do_request(rq, rbd_dev, snapc, snapid, + seg_name, seg_ofs, seg_len, + bio, + NULL, 0, + opcode, + flags, + num_reply, + rbd_req_cb); + kfree(seg_name); + return ret; +} + +/* + * Request async osd write + */ +static int rbd_req_write(struct request *rq, + struct rbd_device *rbd_dev, + struct ceph_snap_context *snapc, + u64 ofs, u64 len, + struct bio *bio) +{ + return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + 2, + ofs, len, bio); +} + +/* + * Request sync osd write + */ +static int rbd_req_sync_write(struct rbd_device *dev, + struct ceph_snap_context *snapc, + u64 snapid, + const char *obj, + u64 ofs, u64 len, + char *buf) +{ + return rbd_req_sync_op(dev, snapc, snapid, + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + 2, obj, ofs, len, buf); +} + +/* + * Request async osd read + */ +static int rbd_req_read(struct request *rq, + struct rbd_device *rbd_dev, + u64 snapid, + u64 ofs, u64 len, + struct bio *bio) +{ + return rbd_do_op(rq, rbd_dev, NULL, + (snapid ? snapid : CEPH_NOSNAP), + CEPH_OSD_OP_READ, + CEPH_OSD_FLAG_READ, + 2, + ofs, len, bio); +} + +/* + * Request sync osd read + */ +static int rbd_req_sync_read(struct rbd_device *dev, + struct ceph_snap_context *snapc, + u64 snapid, + const char *obj, + u64 ofs, u64 len, + char *buf) +{ + return rbd_req_sync_op(dev, NULL, + (snapid ? snapid : CEPH_NOSNAP), + CEPH_OSD_OP_READ, + CEPH_OSD_FLAG_READ, + 1, obj, ofs, len, buf); +} + +/* + * block device queue callback + */ +static void rbd_rq_fn(struct request_queue *q) +{ + struct rbd_device *rbd_dev = q->queuedata; + struct request *rq; + struct bio_pair *bp = NULL; + + rq = blk_fetch_request(q); + + while (1) { + struct bio *bio; + struct bio *rq_bio, *next_bio = NULL; + bool do_write; + int size, op_size = 0; + u64 ofs; + + /* peek at request from block layer */ + if (!rq) + break; + + dout("fetched request\n"); + + /* filter out block requests we don't understand */ + if (!blk_fs_request(rq) && !blk_barrier_rq(rq)) { + __blk_end_request_all(rq, 0); + goto next; + } + + /* deduce our operation (read, write) */ + do_write = (rq_data_dir(rq) == WRITE); + + size = blk_rq_bytes(rq); + ofs = blk_rq_pos(rq) * 512ULL; + rq_bio = rq->bio; + if (do_write && rbd_dev->read_only) { + __blk_end_request_all(rq, -EROFS); + goto next; + } + + spin_unlock_irq(q->queue_lock); + + dout("%s 0x%x bytes at 0x%llx\n", + do_write ? "write" : "read", + size, blk_rq_pos(rq) * 512ULL); + + do { + /* a bio clone to be passed down to OSD req */ + dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); + op_size = rbd_get_segment(&rbd_dev->header, + rbd_dev->obj, + ofs, size, + NULL, NULL); + bio = bio_chain_clone(&rq_bio, &next_bio, &bp, + op_size, GFP_ATOMIC); + if (!bio) { + spin_lock_irq(q->queue_lock); + __blk_end_request_all(rq, -ENOMEM); + goto next; + } + + /* init OSD command: write or read */ + if (do_write) + rbd_req_write(rq, rbd_dev, + rbd_dev->header.snapc, + ofs, + op_size, bio); + else + rbd_req_read(rq, rbd_dev, + cur_snap_id(rbd_dev), + ofs, + op_size, bio); + + size -= op_size; + ofs += op_size; + + rq_bio = next_bio; + } while (size > 0); + + if (bp) + bio_pair_release(bp); + + spin_lock_irq(q->queue_lock); +next: + rq = blk_fetch_request(q); + } +} + +/* + * a queue callback. Makes sure that we don't create a bio that spans across + * multiple osd objects. One exception would be with a single page bios, + * which we handle later at bio_chain_clone + */ +static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, + struct bio_vec *bvec) +{ + sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); + unsigned int chunk_sectors = (RBD_STRIPE_UNIT >> 9); + unsigned int bio_sectors = bmd->bi_size >> 9; + int max; + + max = (chunk_sectors - ((sector & (chunk_sectors - 1)) + + bio_sectors)) << 9; + if (max < 0) + max = 0; /* bio_add cannot handle a negative return */ + if (max <= bvec->bv_len && bio_sectors == 0) + return bvec->bv_len; + return max; +} + +static void rbd_free_disk(struct rbd_device *rbd_dev) +{ + struct gendisk *disk = rbd_dev->disk; + + if (!disk) + return; + + rbd_header_free(&rbd_dev->header); + + if (disk->flags & GENHD_FL_UP) + del_gendisk(disk); + if (disk->queue) + blk_cleanup_queue(disk->queue); + put_disk(disk); +} + +static char *rbd_alloc_md_name(struct rbd_device *rbd_dev, gfp_t gfp_flags) +{ + char *obj_md_name = kmalloc(strlen(rbd_dev->obj) + sizeof(RBD_SUFFIX), + gfp_flags); + if (!obj_md_name) + return NULL; + sprintf(obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX); + + return obj_md_name; +} + +static int rbd_read_header(struct rbd_device *rbd_dev, + struct rbd_obj_header *header) +{ + ssize_t rc; + char *obj_md_name; + struct rbd_obj_header_ondisk *dh; + int snap_count = 0; + u64 snap_names_len = 0; + + obj_md_name = rbd_alloc_md_name(rbd_dev, GFP_KERNEL); + if (!obj_md_name) + return -ENOMEM; + + while (1) { + int len = sizeof(*dh) + + snap_count * sizeof(struct rbd_obj_snap_ondisk) + + snap_names_len; + + rc = -ENOMEM; + dh = kmalloc(len, GFP_KERNEL); + if (!dh) + goto out_obj_md; + + rc = rbd_req_sync_read(rbd_dev, + NULL, CEPH_NOSNAP, + obj_md_name, + 0, len, + (char *)dh); + if (rc < 0) + goto out_dh; + + rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); + if (rc < 0) + goto out_dh; + + if (snap_count != header->total_snaps) { + snap_count = header->total_snaps; + snap_names_len = header->snap_names_len; + rbd_header_free(header); + kfree(dh); + continue; + } + break; + } + +out_dh: + kfree(dh); +out_obj_md: + kfree(obj_md_name); + return rc; +} + +/* + * only read the first part of the ondisk header, without the snaps info + */ +static int rbd_read_ondisk_header_nosnap(struct rbd_device *rbd_dev, + struct rbd_obj_header *header, + struct rbd_obj_header_ondisk *dh) +{ + ssize_t rc; + char *obj_md_name; + int len; + + obj_md_name = rbd_alloc_md_name(rbd_dev, GFP_KERNEL); + if (!obj_md_name) + return -ENOMEM; + + len = sizeof(struct rbd_obj_header_ondisk); + + rc = rbd_req_sync_read(rbd_dev, + NULL, CEPH_NOSNAP, + obj_md_name, + 0, len, + (char *)dh); + if (rc > 0 && rc < len) + rc = -EIO; + + kfree(obj_md_name); + return rc; +} + +static int rbd_write_header(struct rbd_device *rbd_dev, + struct rbd_obj_header_ondisk *dh, + struct rbd_obj_header *header) +{ + ssize_t rc; + char *obj_md_name; + int snap_count = header->total_snaps; + u64 snap_names_len = header->snap_names_len; + int len; + + obj_md_name = rbd_alloc_md_name(rbd_dev, GFP_KERNEL); + if (!obj_md_name) + return -ENOMEM; + + len = sizeof(*dh) + + snap_count * sizeof(struct rbd_obj_snap_ondisk) + + snap_names_len; + + rc = rbd_req_sync_write(rbd_dev, + NULL, CEPH_NOSNAP, + obj_md_name, + 0, len, + (char *)dh); + kfree(obj_md_name); + return rc; +} + +static int rbd_update_snaps(struct rbd_device *rbd_dev) +{ + int ret; + struct rbd_obj_header h; + + ret = rbd_read_header(rbd_dev, &h); + if (ret < 0) + return ret; + + down_write(&rbd_dev->header.snap_rwsem); + + kfree(rbd_dev->header.snapc); + kfree(rbd_dev->header.snap_names); + kfree(rbd_dev->header.snap_sizes); + + rbd_dev->header.total_snaps = h.total_snaps; + rbd_dev->header.snapc = h.snapc; + rbd_dev->header.snap_names = h.snap_names; + rbd_dev->header.snap_sizes = h.snap_sizes; + + up_write(&rbd_dev->header.snap_rwsem); + + return 0; +} + +static int rbd_init_disk(struct rbd_device *rbd_dev) +{ + struct gendisk *disk; + struct request_queue *q; + int rc; + u64 total_size; + const char *snap = NULL; + + /* contact OSD, request size info about the object being mapped */ + rc = rbd_read_header(rbd_dev, &rbd_dev->header); + if (rc) + return rc; + + if (rbd_dev->client->mount_args) + snap = rbd_dev->client->mount_args->snap; + rc = rbd_header_set_snap(rbd_dev, snap, &total_size); + if (rc) + return rc; + + /* create gendisk info */ + rc = -ENOMEM; + disk = alloc_disk(RBD_MINORS_PER_MAJOR); + if (!disk) + goto out; + + sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id); + disk->major = rbd_dev->major; + disk->first_minor = 0; + disk->fops = &rbd_bd_ops; + disk->private_data = rbd_dev; + + /* init rq */ + rc = -ENOMEM; + q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); + if (!q) + goto out_disk; + blk_queue_merge_bvec(q, rbd_merge_bvec); + disk->queue = q; + + q->queuedata = rbd_dev; + + rbd_dev->disk = disk; + rbd_dev->q = q; + + /* finally, announce the disk to the world */ + set_capacity(disk, total_size / 512ULL); + add_disk(disk); + + pr_info("%s: added with size 0x%llx\n", + disk->disk_name, (unsigned long long)total_size); + return 0; + +out_disk: + put_disk(disk); +out: + return rc; +} + +/******************************************************************** + * /sys/class/rbd/ + * add map rados objects to blkdev + * remove unmap rados objects + * list show mappings + *******************************************************************/ + +static void class_rbd_release(struct class *cls) +{ + kfree(cls); +} + +static ssize_t class_rbd_list(struct class *c, + struct class_attribute *attr, + char *data) +{ + int n = 0; + struct list_head *tmp; + + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + + list_for_each(tmp, &rbddev_list) { + struct rbd_device *rbd_dev; + + rbd_dev = list_entry(tmp, struct rbd_device, node); + n += sprintf(data+n, "%d %d client%lld %s %s\n", + rbd_dev->id, + rbd_dev->major, + ceph_client_id(rbd_dev->client), + rbd_dev->pool_name, + rbd_dev->obj); + } + + mutex_unlock(&ctl_mutex); + return n; +} + +static ssize_t class_rbd_add(struct class *c, + struct class_attribute *attr, + const char *buf, size_t count) +{ + struct rbd_device *rbd_dev; + ssize_t rc = -ENOMEM; + int irc, new_id = 0; + struct list_head *tmp; + char *mon_dev_name; + char *opt; + + if (!try_module_get(THIS_MODULE)) + return -ENODEV; + + mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); + if (!mon_dev_name) + goto err_out_mod; + + opt = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); + if (!opt) + goto err_mon_dev; + + /* new rbd_device object */ + rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); + if (!rbd_dev) + goto err_out_opt; + + /* static rbd_device initialization */ + spin_lock_init(&rbd_dev->lock); + INIT_LIST_HEAD(&rbd_dev->node); + + /* generate unique id: find highest unique id, add one */ + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + + list_for_each(tmp, &rbddev_list) { + struct rbd_device *rbd_dev; + + rbd_dev = list_entry(tmp, struct rbd_device, node); + if (rbd_dev->id >= new_id) + new_id = rbd_dev->id + 1; + } + + rbd_dev->id = new_id; + + /* add to global list */ + list_add_tail(&rbd_dev->node, &rbddev_list); + + /* parse add command */ + if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s " + "%" __stringify(RBD_MAX_OPT_LEN) "s " + "%" __stringify(RBD_MAX_POOL_NAME_SIZE) "s " + "%" __stringify(RBD_MAX_OBJ_NAME_SIZE) "s", + mon_dev_name, opt, rbd_dev->pool_name, + rbd_dev->obj) != 4) { + rc = -EINVAL; + goto err_out_slot; + } + + rbd_dev->obj_len = strlen(rbd_dev->obj); + + /* initialize rest of new object */ + snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id); + rc = rbd_get_client(rbd_dev, mon_dev_name, opt); + if (rc < 0) + goto err_out_slot; + + mutex_unlock(&ctl_mutex); + /* register our block device */ + irc = register_blkdev(0, rbd_dev->name); + if (irc < 0) { + rc = irc; + goto err_out_client; + } + rbd_dev->major = irc; + + /* set up and announce blkdev mapping */ + rc = rbd_init_disk(rbd_dev); + if (rc) + goto err_out_blkdev; + + return count; + +err_out_blkdev: + unregister_blkdev(rbd_dev->major, rbd_dev->name); +err_out_client: + rbd_put_client(rbd_dev); + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); +err_out_slot: + list_del_init(&rbd_dev->node); + mutex_unlock(&ctl_mutex); + + kfree(rbd_dev); +err_out_opt: + kfree(opt); +err_mon_dev: + kfree(mon_dev_name); +err_out_mod: + dout("Error adding device %s\n", buf); + module_put(THIS_MODULE); + return rc; +} + +static struct rbd_device *__rbd_get_dev(unsigned long id) +{ + struct list_head *tmp; + struct rbd_device *rbd_dev = NULL; + + list_for_each(tmp, &rbddev_list) { + rbd_dev = list_entry(tmp, struct rbd_device, node); + if (rbd_dev->id == id) + break; + + rbd_dev = NULL; + } + + return rbd_dev; +} + +static ssize_t class_rbd_remove(struct class *c, + struct class_attribute *attr, + const char *buf, + size_t count) +{ + struct rbd_device *rbd_dev = NULL; + int target_id, rc; + unsigned long ul; + + rc = strict_strtoul(buf, 10, &ul); + if (rc) + return rc; + + /* convert to int; abort if we lost anything in the conversion */ + target_id = (int) ul; + if (target_id != ul) + return -EINVAL; + + /* remove object from list immediately */ + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + + rbd_dev = __rbd_get_dev(target_id); + if (rbd_dev) + list_del_init(&rbd_dev->node); + + mutex_unlock(&ctl_mutex); + + if (!rbd_dev) + return -ENOENT; + + rbd_put_client(rbd_dev); + + /* clean up and free blkdev and associated OSD connection */ + rbd_free_disk(rbd_dev); + unregister_blkdev(rbd_dev->major, rbd_dev->name); + kfree(rbd_dev); + + /* release module ref */ + module_put(THIS_MODULE); + + return count; +} + +static void get_size_and_suffix(u64 orig_size, u64 *size, char *suffix) +{ + if (orig_size >= 1024*1024*1024) { + *size = orig_size / (1024*1024*1024); + *suffix = 'G'; + } else if (orig_size >= 1024*1024) { + *size = orig_size / (1024*1024); + *suffix = 'M'; + } else if (orig_size >= 1024) { + *size = orig_size / 1024; + *suffix = 'K'; + } else { + *size = orig_size; + *suffix = ' '; + } +} + +static ssize_t class_rbd_snaps_list(struct class *c, + struct class_attribute *attr, + char *data) +{ + struct rbd_device *rbd_dev = NULL; + struct list_head *tmp; + struct rbd_obj_header *header; + char size_suffix; + u64 size; + int i, n = 0, max = PAGE_SIZE; + int ret; + + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + + list_for_each(tmp, &rbddev_list) { + char *names, *p; + struct ceph_snap_context *snapc; + + rbd_dev = list_entry(tmp, struct rbd_device, node); + header = &rbd_dev->header; + names = header->snap_names; + snapc = header->snapc; + n += snprintf(data + n, max - n, + "snapshots for device id %d:\n", + rbd_dev->id); + if (n == max) + break; + + down_read(&header->snap_rwsem); + + get_size_and_suffix(header->image_size, &size, + &size_suffix); + n += snprintf(data + n, max - n, "%s\t%lld%c%s\n", + RBD_SNAP_HEAD_NAME, + size, size_suffix, + (!rbd_dev->cur_snap ? + " (*)" : "")); + if (n == max) + break; + + p = names; + for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { + get_size_and_suffix(header->snap_sizes[i], &size, + &size_suffix); + n += snprintf(data + n, max - n, "%s\t%lld%c%s\n", + p, size, size_suffix, + (rbd_dev->cur_snap && + (snap_index(header, i) == rbd_dev->cur_snap) ? + " (*)" : "")); + if (n == max) + break; + } + + up_read(&header->snap_rwsem); + } + + + ret = n; + mutex_unlock(&ctl_mutex); + return ret; +} + +static ssize_t class_rbd_snaps_refresh(struct class *c, + struct class_attribute *attr, + const char *buf, + size_t count) +{ + struct rbd_device *rbd_dev = NULL; + int target_id, rc; + unsigned long ul; + int ret = count; + + rc = strict_strtoul(buf, 10, &ul); + if (rc) + return rc; + + /* convert to int; abort if we lost anything in the conversion */ + target_id = (int) ul; + if (target_id != ul) + return -EINVAL; + + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + + rbd_dev = __rbd_get_dev(target_id); + if (!rbd_dev) { + ret = -ENOENT; + goto done; + } + + rc = rbd_update_snaps(rbd_dev); + if (rc < 0) + ret = rc; + +done: + mutex_unlock(&ctl_mutex); + return ret; +} + +static ssize_t class_rbd_snaps_op(struct class *c, + struct class_attribute *attr, + const char *buf, + size_t count, + int snaps_op) +{ + struct rbd_device *rbd_dev = NULL; + int target_id, ret; + char *name; + struct rbd_obj_header_ondisk old_ondisk, *new_ondisk; + + name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL); + if (!name) + return -ENOMEM; + + /* parse snaps add command */ + if (sscanf(buf, "%d " + "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s", + &target_id, + name) != 2) { + ret = -EINVAL; + goto done; + } + + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + + rbd_dev = __rbd_get_dev(target_id); + if (!rbd_dev) { + ret = -ENOENT; + goto done_unlock; + } + + ret = rbd_read_ondisk_header_nosnap(rbd_dev, + &rbd_dev->header, + &old_ondisk); + if (ret < 0) + goto done_unlock; + + switch (snaps_op) { + case RBD_SNAP_OP_CREATE: + ret = rbd_header_add_snap(rbd_dev, + name, GFP_KERNEL); + break; + case RBD_SNAP_OP_SET: + ret = rbd_header_set_snap(rbd_dev, name, NULL); + break; + default: + ret = -EINVAL; + } + if (ret < 0) + goto done_unlock; + + ret = rbd_header_to_disk(&new_ondisk, &old_ondisk, + &rbd_dev->header, GFP_KERNEL); + if (ret < 0) + goto done_unlock; + + ret = rbd_write_header(rbd_dev, new_ondisk, &rbd_dev->header); + if (ret < 0) + goto done_unlock; + + ret = count; +done_unlock: + mutex_unlock(&ctl_mutex); +done: + kfree(name); + return ret; +} + +static ssize_t class_rbd_snap_create(struct class *c, + struct class_attribute *attr, + const char *buf, + size_t count) +{ + return class_rbd_snaps_op(c, attr, buf, count, + RBD_SNAP_OP_CREATE); +} + +static struct class_attribute class_rbd_attrs[] = { + __ATTR(add, 0200, NULL, class_rbd_add), + __ATTR(remove, 0200, NULL, class_rbd_remove), + __ATTR(list, 0444, class_rbd_list, NULL), + __ATTR(snaps_refresh, 0200, NULL, class_rbd_snaps_refresh), + __ATTR(snap_create, 0200, NULL, class_rbd_snap_create), + __ATTR(snaps_list, 0444, class_rbd_snaps_list, NULL), + __ATTR_NULL +}; + +/* + * create control files in sysfs + * /sys/class/rbd/... + */ +static int rbd_sysfs_init(void) +{ + int ret = -ENOMEM; + + class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL); + if (!class_rbd) + goto out; + + class_rbd->name = DRV_NAME; + class_rbd->owner = THIS_MODULE; + class_rbd->class_release = class_rbd_release; + class_rbd->class_attrs = class_rbd_attrs; + + ret = class_register(class_rbd); + if (ret) + goto out_class; + return 0; + +out_class: + kfree(class_rbd); + class_rbd = NULL; + pr_err(DRV_NAME ": failed to create class rbd\n"); +out: + return ret; +} + +static void rbd_sysfs_cleanup(void) +{ + if (class_rbd) + class_destroy(class_rbd); + class_rbd = NULL; +} + +int __init rbd_init(void) +{ + int rc; + + rc = rbd_sysfs_init(); + if (rc) + return rc; + spin_lock_init(&node_lock); + pr_info("loaded " DRV_NAME_LONG); + return 0; +} + +void __exit rbd_exit(void) +{ + rbd_sysfs_cleanup(); +} diff --git a/fs/ceph/rbd.h b/fs/ceph/rbd.h new file mode 100644 index 000000000000..68e0a5c69dc8 --- /dev/null +++ b/fs/ceph/rbd.h @@ -0,0 +1,8 @@ +#ifndef _FS_CEPH_RBD +#define _FS_CEPH_RBD + +extern void rbd_set_osdc(struct ceph_osd_client *o); +extern int __init rbd_init(void); +extern void __exit rbd_exit(void); + +#endif diff --git a/fs/ceph/rbd_types.h b/fs/ceph/rbd_types.h new file mode 100644 index 000000000000..b73ac12ad0d6 --- /dev/null +++ b/fs/ceph/rbd_types.h @@ -0,0 +1,48 @@ +#ifndef _FS_CEPH_RBD +#define _FS_CEPH_RBD + +#include <linux/types.h> + +/* + * rbd image 'foo' consists of objects + * foo.rbd - image metadata + * foo.00000000 + * foo.00000001 + * ... - data + */ + +#define RBD_SUFFIX ".rbd" +#define RBD_DIRECTORY "rbd_directory" + +#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */ + +#define RBD_MAX_OBJ_NAME_SIZE 96 +#define RBD_MAX_SEG_NAME_SIZE 128 + +#define RBD_COMP_NONE 0 +#define RBD_CRYPT_NONE 0 + +static const char rbd_text[] = "<<< Rados Block Device Image >>>\n"; +static const char rbd_signature[] = "RBD"; +static const char rbd_version[] = "001.002"; + +struct rbd_obj_snap_ondisk { + __le64 id; + __le64 image_size; +} __attribute__((packed)); + +struct rbd_obj_header_ondisk { + char text[64]; + char signature[4]; + char version[8]; + __le64 image_size; + __u8 obj_order; + __u8 crypt_type; + __u8 comp_type; + __le64 snap_seq; + __le32 snap_count; + __le64 snap_names_len; + struct rbd_obj_snap_ondisk snaps[0]; +} __attribute__((packed)); + +#endif diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 7c663d9b9f81..510acdeae321 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -18,6 +18,7 @@ #include "super.h" #include "mon_client.h" #include "auth.h" +#include "rbd.h" /* * Ceph superblock operations @@ -342,6 +343,7 @@ enum { Opt_snapdirname, Opt_name, Opt_secret, + Opt_snap, Opt_last_string, /* string args above */ Opt_ip, @@ -374,6 +376,7 @@ static match_table_t arg_tokens = { {Opt_snapdirname, "snapdirname=%s"}, {Opt_name, "name=%s"}, {Opt_secret, "secret=%s"}, + {Opt_snap, "snap=%s"}, /* string args above */ {Opt_ip, "ip=%s"}, {Opt_noshare, "noshare"}, @@ -387,14 +390,15 @@ static match_table_t arg_tokens = { }; -static struct ceph_mount_args *parse_mount_args(int flags, char *options, - const char *dev_name, - const char **path) +struct ceph_mount_args *parse_mount_args(int flags, char *options, + const char *dev_name, + const char **path) { struct ceph_mount_args *args; const char *c; int err = -ENOMEM; substring_t argstr[MAX_OPT_ARGS]; + const char *end_path; args = kzalloc(sizeof(*args), GFP_KERNEL); if (!args) @@ -426,23 +430,29 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, err = -EINVAL; if (!dev_name) goto out; - *path = strstr(dev_name, ":/"); - if (*path == NULL) { - pr_err("device name is missing path (no :/ in %s)\n", - dev_name); - goto out; + + if (path) { + *path = strstr(dev_name, ":/"); + if (*path == NULL) { + pr_err("device name is missing path (no :/ in %s)\n", + dev_name); + goto out; + } + end_path = *path; + + /* path on server */ + *path += 2; + dout("server path '%s'\n", *path); + } else { + end_path = dev_name + strlen(dev_name); } /* get mon ip(s) */ - err = ceph_parse_ips(dev_name, *path, args->mon_addr, + err = ceph_parse_ips(dev_name, end_path, args->mon_addr, CEPH_MAX_MON, &args->num_mon); if (err < 0) goto out; - /* path on server */ - *path += 2; - dout("server path '%s'\n", *path); - /* parse mount options */ while ((c = strsep(&options, ",")) != NULL) { int token, intval, ret; @@ -501,6 +511,11 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, argstr[0].to-argstr[0].from, GFP_KERNEL); break; + case Opt_snap: + args->snap = kstrndup(argstr[0].from, + argstr[0].to-argstr[0].from, + GFP_KERNEL); + break; /* misc */ case Opt_wsize: @@ -569,22 +584,70 @@ out: return ERR_PTR(err); } -static void destroy_mount_args(struct ceph_mount_args *args) +void ceph_destroy_mount_args(struct ceph_mount_args *args) { dout("destroy_mount_args %p\n", args); kfree(args->snapdir_name); - args->snapdir_name = NULL; kfree(args->name); - args->name = NULL; kfree(args->secret); - args->secret = NULL; + kfree(args->snap); kfree(args); } +static int strcmp_null(const char *s1, const char *s2) +{ + if (!s1 && !s2) + return 0; + if (s1 && !s2) + return -1; + if (!s1 && s2) + return 1; + return strcmp(s1, s2); +} + +int ceph_compare_mount_args(struct ceph_mount_args *new_args, + struct ceph_client *client) +{ + struct ceph_mount_args *args1 = new_args; + struct ceph_mount_args *args2 = client->mount_args; + int ofs = offsetof(struct ceph_mount_args, mon_addr); + int i; + int ret; + + ret = memcmp(args1, args2, ofs); + if (ret) + return ret; + + ret = strcmp_null(args1->snapdir_name, args2->snapdir_name); + if (ret) + return ret; + + ret = strcmp_null(args1->name, args2->name); + if (ret) + return ret; + + ret = strcmp_null(args1->secret, args2->secret); + if (ret) + return ret; + + ret = strcmp_null(args1->snap, args2->snap); + if (ret) + return ret; + + for (i = 0; i < args1->num_mon; i++) { + if (ceph_monmap_contains(client->monc.monmap, + &args1->mon_addr[i])) + return 0; + } + + return -1; +} + /* * create a fresh client instance */ -static struct ceph_client *ceph_create_client(struct ceph_mount_args *args) +struct ceph_client *ceph_create_client(struct ceph_mount_args *args, + int need_mdsc) { struct ceph_client *client; int err = -ENOMEM; @@ -639,9 +702,13 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args) err = ceph_osdc_init(&client->osdc, client); if (err < 0) goto fail_monc; - err = ceph_mdsc_init(&client->mdsc, client); - if (err < 0) - goto fail_osdc; + if (need_mdsc) { + err = ceph_mdsc_init(&client->mdsc, client); + if (err < 0) + goto fail_osdc; + client->have_mdsc = 1; + } + return client; fail_osdc: @@ -663,7 +730,12 @@ fail: return ERR_PTR(err); } -static void ceph_destroy_client(struct ceph_client *client) +u64 ceph_client_id(struct ceph_client *client) +{ + return client->monc.auth->global_id; +} + +void ceph_destroy_client(struct ceph_client *client) { dout("destroy_client %p\n", client); @@ -685,7 +757,7 @@ static void ceph_destroy_client(struct ceph_client *client) ceph_messenger_destroy(client->msgr); mempool_destroy(client->wb_pagevec_pool); - destroy_mount_args(client->mount_args); + ceph_destroy_mount_args(client->mount_args); kfree(client); dout("destroy_client %p done\n", client); @@ -704,7 +776,7 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid) } } else { pr_info("client%lld fsid " FSID_FORMAT "\n", - client->monc.auth->global_id, PR_FSID(fsid)); + ceph_client_id(client), PR_FSID(fsid)); memcpy(&client->fsid, fsid, sizeof(*fsid)); ceph_debugfs_client_init(client); client->have_fsid = true; @@ -766,17 +838,12 @@ static struct dentry *open_root_dentry(struct ceph_client *client, /* * mount: join the ceph cluster, and open root directory. */ -static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, - const char *path) +static int __ceph_open_session(struct ceph_client *client, + unsigned long started) { struct ceph_entity_addr *myaddr = NULL; int err; unsigned long timeout = client->mount_args->mount_timeout * HZ; - unsigned long started = jiffies; /* note the start time */ - struct dentry *root; - - dout("mount start\n"); - mutex_lock(&client->mount_mutex); /* initialize the messenger */ if (client->msgr == NULL) { @@ -784,9 +851,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, myaddr = &client->mount_args->my_addr; client->msgr = ceph_messenger_create(myaddr); if (IS_ERR(client->msgr)) { - err = PTR_ERR(client->msgr); client->msgr = NULL; - goto out; + return PTR_ERR(client->msgr); } client->msgr->nocrc = ceph_test_opt(client, NOCRC); } @@ -794,26 +860,58 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, /* open session, and wait for mon, mds, and osd maps */ err = ceph_monc_open_session(&client->monc); if (err < 0) - goto out; + return err; while (!have_mon_and_osd_map(client)) { err = -EIO; if (timeout && time_after_eq(jiffies, started + timeout)) - goto out; + return err; /* wait */ dout("mount waiting for mon_map\n"); err = wait_event_interruptible_timeout(client->auth_wq, - have_mon_and_osd_map(client) || (client->auth_err < 0), - timeout); + have_mon_and_osd_map(client) || (client->auth_err < 0), + timeout); if (err == -EINTR || err == -ERESTARTSYS) - goto out; - if (client->auth_err < 0) { - err = client->auth_err; - goto out; - } + return err; + if (client->auth_err < 0) + return client->auth_err; } + return 0; +} + +int ceph_open_session(struct ceph_client *client) +{ + int ret; + unsigned long started = jiffies; /* note the start time */ + + dout("open_session start\n"); + mutex_lock(&client->mount_mutex); + + ret = __ceph_open_session(client, started); + + mutex_unlock(&client->mount_mutex); + return ret; +} + +/* + * mount: join the ceph cluster, and open root directory. + */ +static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, + const char *path) +{ + int err; + unsigned long started = jiffies; /* note the start time */ + struct dentry *root; + + dout("mount start\n"); + mutex_lock(&client->mount_mutex); + + err = __ceph_open_session(client, started); + if (err < 0) + goto out; + dout("mount opening root\n"); root = open_root_dentry(client, "", started); if (IS_ERR(root)) { @@ -955,7 +1053,7 @@ static int ceph_get_sb(struct file_system_type *fs_type, } /* create client (which we may/may not use) */ - client = ceph_create_client(args); + client = ceph_create_client(args, 1); if (IS_ERR(client)) { err = PTR_ERR(client); goto out_final; @@ -1045,8 +1143,14 @@ static int __init init_ceph(void) CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL, CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT, CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT); + + ret = rbd_init(); + if (ret) + goto out_fs; return 0; +out_fs: + unregister_filesystem(&ceph_fs_type); out_icache: destroy_caches(); out_msgr: @@ -1060,6 +1164,7 @@ out: static void __exit exit_ceph(void) { dout("exit_ceph\n"); + rbd_exit(); unregister_filesystem(&ceph_fs_type); ceph_caps_finalize(); destroy_caches(); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 3725c9ee9d08..167572dbf469 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -49,14 +49,11 @@ #define ceph_test_opt(client, opt) \ (!!((client)->mount_args->flags & CEPH_OPT_##opt)) - struct ceph_mount_args { int sb_flags; int flags; struct ceph_fsid fsid; struct ceph_entity_addr my_addr; - int num_mon; - struct ceph_entity_addr *mon_addr; int mount_timeout; int osd_idle_ttl; int osd_timeout; @@ -68,9 +65,17 @@ struct ceph_mount_args { int cap_release_safety; int max_readdir; /* max readdir result (entires) */ int max_readdir_bytes; /* max readdir result (bytes) */ + + /* any type that can't be simply compared or doesn't need + need to be compared should go beyond this point, + ceph_compare_mount_args() should be updated accordingly */ + struct ceph_entity_addr *mon_addr; /* should be the first + pointer type of args */ + int num_mon; char *snapdir_name; /* default ".snap" */ char *name; char *secret; + char *snap; /* rbd snapshot */ }; /* @@ -140,6 +145,8 @@ struct ceph_client { int min_caps; /* min caps i added */ + int have_mdsc; + struct ceph_messenger *msgr; /* messenger instance */ struct ceph_mon_client monc; struct ceph_mds_client mdsc; @@ -738,6 +745,17 @@ extern struct kmem_cache *ceph_file_cachep; extern const char *ceph_msg_type_name(int type); extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid); +extern struct ceph_mount_args *parse_mount_args(int flags, char *options, + const char *dev_name, + const char **path); +extern void ceph_destroy_mount_args(struct ceph_mount_args *args); +extern int ceph_compare_mount_args(struct ceph_mount_args *new_args, + struct ceph_client *client); +extern struct ceph_client *ceph_create_client(struct ceph_mount_args *args, + int need_mdsc); +extern u64 ceph_client_id(struct ceph_client *client); +extern void ceph_destroy_client(struct ceph_client *client); +extern int ceph_open_session(struct ceph_client *client); #define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \ "%02x%02x%02x%02x%02x%02x" @@ -848,6 +866,13 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma); /* file.c */ extern const struct file_operations ceph_file_fops; extern const struct address_space_operations ceph_aops; +extern int ceph_copy_to_page_vector(struct page **pages, + const char *data, + loff_t off, size_t len); +extern int ceph_copy_from_page_vector(struct page **pages, + char *data, + loff_t off, size_t len); +extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags); extern int ceph_open(struct inode *inode, struct file *file); extern struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, struct nameidata *nd, int mode, @@ -897,4 +922,9 @@ static inline struct inode *get_dentry_parent_inode(struct dentry *dentry) return NULL; } +#ifndef CONFIG_RBD +static inline int __init rbd_init(void) { return 0; } +static inline void __exit rbd_exit(void) {} +#endif + #endif /* _FS_CEPH_SUPER_H */ |