summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Rothwell <sfr@canb.auug.org.au>2010-06-01 10:59:14 +1000
committerStephen Rothwell <sfr@canb.auug.org.au>2010-06-01 10:59:14 +1000
commit60c390e5477a5300d6262c8a08eed4d352d3056c (patch)
tree847189f69fa521dca7e69ac288b498a05bce4cce
parent83cd9370a3991d74df10640a2827fa8a04d660be (diff)
parent3a6e756908487ca0ec1d201c389e823c557de863 (diff)
Merge remote branch 'ceph/for-next'
-rw-r--r--fs/ceph/Kconfig10
-rw-r--r--fs/ceph/Makefile2
-rw-r--r--fs/ceph/README1
-rw-r--r--fs/ceph/debugfs.c11
-rw-r--r--fs/ceph/file.c46
-rw-r--r--fs/ceph/messenger.c188
-rw-r--r--fs/ceph/messenger.h3
-rw-r--r--fs/ceph/mon_client.c173
-rw-r--r--fs/ceph/mon_client.h5
-rw-r--r--fs/ceph/osd_client.c207
-rw-r--r--fs/ceph/osd_client.h30
-rw-r--r--fs/ceph/osdmap.c13
-rw-r--r--fs/ceph/osdmap.h2
-rw-r--r--fs/ceph/rbd.c1803
-rw-r--r--fs/ceph/rbd.h8
-rw-r--r--fs/ceph/rbd_types.h48
-rw-r--r--fs/ceph/super.c191
-rw-r--r--fs/ceph/super.h36
18 files changed, 2624 insertions, 153 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 04b8280582a9..4c49d18303db 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -25,3 +25,13 @@ config CEPH_FS_PRETTYDEBUG
If unsure, say N.
+config CEPH_RBD
+ bool "Rados block device (RBD)"
+ depends on CEPH_FS
+ select CONFIG_BLOCK
+ default y
+ help
+ If you say Y here, ceph will include rbd, the RADOS block
+ device which stripes a block device over objects stored in
+ the Ceph distributed object store.
+
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 6a660e610be8..d2f6326c01dc 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -18,6 +18,8 @@ ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
auth_x.o \
ceph_fs.o ceph_strings.o ceph_hash.o ceph_frag.o
+obj-$(CONFIG_RBD) += rbd.o
+
else
#Otherwise we were called directly from the command
# line; invoke the kernel build system.
diff --git a/fs/ceph/README b/fs/ceph/README
index 18352fab37c0..e2046ec8e54a 100644
--- a/fs/ceph/README
+++ b/fs/ceph/README
@@ -7,6 +7,7 @@ src/include/ceph_fs.h fs/ceph/ceph_fs.h
src/include/ceph_fs.cc fs/ceph/ceph_fs.c
src/include/msgr.h fs/ceph/msgr.h
src/include/rados.h fs/ceph/rados.h
+src/include/rbd_types.h fs/ceph/rbd_types.h
src/include/ceph_strings.cc fs/ceph/ceph_strings.c
src/include/ceph_frag.h fs/ceph/ceph_frag.h
src/include/ceph_frag.cc fs/ceph/ceph_frag.c
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 3be33fb066cc..c5d6a01d1214 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -440,9 +440,14 @@ int ceph_debugfs_client_init(struct ceph_client *client)
if (!client->debugfs_congestion_kb)
goto out;
- sprintf(name, "../../bdi/%s", dev_name(client->sb->s_bdi->dev));
- client->debugfs_bdi = debugfs_create_symlink("bdi", client->debugfs_dir,
- name);
+ if (client->backing_dev_info.dev) {
+ sprintf(name, "../../bdi/%s",
+ dev_name(client->backing_dev_info.dev));
+ client->debugfs_bdi =
+ debugfs_create_symlink("bdi",
+ client->debugfs_dir,
+ name);
+ }
return 0;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 6251a1574b94..4c21b51c833a 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -363,6 +363,52 @@ static int copy_user_to_page_vector(struct page **pages,
return len;
}
+int ceph_copy_to_page_vector(struct page **pages,
+ const char *data,
+ loff_t off, size_t len)
+{
+ int i = 0;
+ size_t po = off & ~PAGE_CACHE_MASK;
+ size_t left = len;
+ size_t l;
+
+ while (left > 0) {
+ l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+ memcpy(page_address(pages[i]) + po, data, l);
+ data += l;
+ left -= l;
+ po += l;
+ if (po == PAGE_CACHE_SIZE) {
+ po = 0;
+ i++;
+ }
+ }
+ return len;
+}
+
+int ceph_copy_from_page_vector(struct page **pages,
+ char *data,
+ loff_t off, size_t len)
+{
+ int i = 0;
+ size_t po = off & ~PAGE_CACHE_MASK;
+ size_t left = len;
+ size_t l;
+
+ while (left > 0) {
+ l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+ memcpy(data, page_address(pages[i]) + po, l);
+ data += l;
+ left -= l;
+ po += l;
+ if (po == PAGE_CACHE_SIZE) {
+ po = 0;
+ i++;
+ }
+ }
+ return len;
+}
+
/*
* copy user data from a page vector into a user pointer
*/
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 64b8b1f7863d..8785ca41becd 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
#include <net/tcp.h>
#include "super.h"
@@ -539,8 +541,11 @@ static void prepare_write_message(struct ceph_connection *con)
if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
- con->out_msg_pos.page_pos =
- le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ if (m->pages)
+ con->out_msg_pos.page_pos =
+ le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ else
+ con->out_msg_pos.page_pos = 0;
con->out_msg_pos.data_pos = 0;
con->out_msg_pos.did_page_crc = 0;
con->out_more = 1; /* data + footer will follow */
@@ -722,6 +727,31 @@ out:
return ret; /* done! */
}
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+ if (!bio) {
+ *iter = NULL;
+ *seg = 0;
+ return;
+ }
+ *iter = bio;
+ *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+ if (*bio_iter == NULL)
+ return;
+
+ BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+ (*seg)++;
+ if (*seg == (*bio_iter)->bi_vcnt)
+ init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
@@ -741,9 +771,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);
- while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+#ifdef CONFIG_BLOCK
+ if (msg->bio && !msg->bio_iter)
+ init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+
+ while (data_len - con->out_msg_pos.data_pos > 0) {
struct page *page = NULL;
void *kaddr = NULL;
+ int max_write = PAGE_SIZE;
+ int page_shift = 0;
/*
* if we are calculating the data crc (the default), we need
@@ -759,13 +796,24 @@ static int write_partial_msg_pages(struct ceph_connection *con)
struct page, lru);
if (crc)
kaddr = kmap(page);
+#ifdef CONFIG_BLOCK
+ } else if (msg->bio) {
+ struct bio_vec *bv;
+
+ bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+ page = bv->bv_page;
+ page_shift = bv->bv_offset;
+ if (crc)
+ kaddr = kmap(page) + page_shift;
+ max_write = bv->bv_len;
+#endif
} else {
page = con->msgr->zero_page;
if (crc)
kaddr = page_address(con->msgr->zero_page);
}
- len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
- (int)(data_len - con->out_msg_pos.data_pos));
+ len = min_t(int, max_write - con->out_msg_pos.page_pos,
+ data_len - con->out_msg_pos.data_pos);
if (crc && !con->out_msg_pos.did_page_crc) {
void *base = kaddr + con->out_msg_pos.page_pos;
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -777,11 +825,12 @@ static int write_partial_msg_pages(struct ceph_connection *con)
}
ret = kernel_sendpage(con->sock, page,
- con->out_msg_pos.page_pos, len,
+ con->out_msg_pos.page_pos + page_shift,
+ len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && (msg->pages || msg->pagelist))
+ if (crc && (msg->pages || msg->pagelist || msg->bio))
kunmap(page);
if (ret <= 0)
@@ -796,6 +845,10 @@ static int write_partial_msg_pages(struct ceph_connection *con)
if (msg->pagelist)
list_move_tail(&page->lru,
&msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+ if (msg->bio)
+ iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif
}
}
@@ -1302,8 +1355,7 @@ static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section, unsigned int sec_len,
u32 *crc)
{
- int left;
- int ret;
+ int ret, left;
BUG_ON(!section);
@@ -1326,13 +1378,83 @@ static int read_partial_message_section(struct ceph_connection *con,
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con,
+ struct page **pages,
+ unsigned data_len, int datacrc)
+{
+ void *p;
+ int ret;
+ int left;
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ /* (page) data */
+ BUG_ON(pages == NULL);
+ p = kmap(pages[con->in_msg_pos.page]);
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(pages[con->in_msg_pos.page]);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+ con->in_msg_pos.page_pos = 0;
+ con->in_msg_pos.page++;
+ }
+
+ return ret;
+}
+
+#ifdef CONFIG_BLOCK
+static int read_partial_message_bio(struct ceph_connection *con,
+ struct bio **bio_iter, int *bio_seg,
+ unsigned data_len, int datacrc)
+{
+ struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+ void *p;
+ int ret, left;
+
+ if (IS_ERR(bv))
+ return PTR_ERR(bv);
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+ p = kmap(bv->bv_page) + bv->bv_offset;
+
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(bv->bv_page);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == bv->bv_len) {
+ con->in_msg_pos.page_pos = 0;
+ iter_bio_next(bio_iter, bio_seg);
+ }
+
+ return ret;
+}
+#endif
+
/*
* read (part of) a message.
*/
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
- void *p;
int ret;
int to, left;
unsigned front_len, middle_len, data_len, data_off;
@@ -1417,7 +1539,10 @@ static int read_partial_message(struct ceph_connection *con)
m->middle->vec.iov_len = 0;
con->in_msg_pos.page = 0;
- con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ if (m->pages)
+ con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ else
+ con->in_msg_pos.page_pos = 0;
con->in_msg_pos.data_pos = 0;
}
@@ -1434,27 +1559,29 @@ static int read_partial_message(struct ceph_connection *con)
if (ret <= 0)
return ret;
}
+#ifdef CONFIG_BLOCK
+ if (m->bio && !m->bio_iter)
+ init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
/* (page) data */
while (con->in_msg_pos.data_pos < data_len) {
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- BUG_ON(m->pages == NULL);
- p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(m->pages[con->in_msg_pos.page]);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
+ if (m->pages) {
+ ret = read_partial_message_pages(con, m->pages,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+#ifdef CONFIG_BLOCK
+ } else if (m->bio) {
+
+ ret = read_partial_message_bio(con,
+ &m->bio_iter, &m->bio_seg,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+#endif
+ } else {
+ BUG_ON(1);
}
}
@@ -2130,6 +2257,9 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->nr_pages = 0;
m->pages = NULL;
m->pagelist = NULL;
+ m->bio = NULL;
+ m->bio_iter = NULL;
+ m->bio_seg = 0;
dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 76fbc957bc13..aea42b9fbee3 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -82,6 +82,9 @@ struct ceph_msg {
struct ceph_pagelist *pagelist; /* instead of pages */
struct list_head list_head;
struct kref kref;
+ struct bio *bio; /* instead of pages/pagelist */
+ struct bio *bio_iter; /* bio iterator */
+ int bio_seg; /* current bio segment */
bool front_is_vmalloc;
bool more_to_follow;
bool needs_out_seq;
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 21c62e9b7d1d..b319b91fb60a 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -349,7 +349,7 @@ out:
}
/*
- * statfs
+ * generic requests (e.g., statfs, poolop)
*/
static struct ceph_mon_generic_request *__lookup_generic_req(
struct ceph_mon_client *monc, u64 tid)
@@ -440,6 +440,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m;
}
+static int do_generic_request(struct ceph_mon_client *monc,
+ struct ceph_mon_generic_request *req)
+{
+ int err;
+
+ /* register request */
+ mutex_lock(&monc->mutex);
+ req->tid = ++monc->last_tid;
+ req->request->hdr.tid = cpu_to_le64(req->tid);
+ __insert_generic_request(monc, req);
+ monc->num_generic_requests++;
+ ceph_con_send(monc->con, ceph_msg_get(req->request));
+ mutex_unlock(&monc->mutex);
+
+ err = wait_for_completion_interruptible(&req->completion);
+
+ mutex_lock(&monc->mutex);
+ rb_erase(&req->node, &monc->generic_request_tree);
+ monc->num_generic_requests--;
+ mutex_unlock(&monc->mutex);
+
+ if (!err)
+ err = req->result;
+ return err;
+}
+
+/*
+ * statfs
+ */
static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
@@ -466,7 +495,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
return;
bad:
- pr_err("corrupt generic reply, no tid\n");
+ pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
@@ -485,6 +514,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
kref_init(&req->kref);
req->buf = buf;
+ req->buf_len = sizeof(*buf);
init_completion(&req->completion);
err = -ENOMEM;
@@ -502,33 +532,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
- /* register request */
- mutex_lock(&monc->mutex);
- req->tid = ++monc->last_tid;
- req->request->hdr.tid = cpu_to_le64(req->tid);
- __insert_generic_request(monc, req);
- monc->num_generic_requests++;
- mutex_unlock(&monc->mutex);
+ err = do_generic_request(monc, req);
- /* send request and wait */
- ceph_con_send(monc->con, ceph_msg_get(req->request));
- err = wait_for_completion_interruptible(&req->completion);
+out:
+ kref_put(&req->kref, release_generic_request);
+ return err;
+}
+
+/*
+ * pool ops
+ */
+static int get_poolop_reply_buf(const char *src, size_t src_len,
+ char *dst, size_t dst_len)
+{
+ u32 buf_len;
+
+ if (src_len != sizeof(u32) + dst_len)
+ return -EINVAL;
+
+ buf_len = le32_to_cpu(*(u32 *)src);
+ if (buf_len != dst_len)
+ return -EINVAL;
+
+ memcpy(dst, src + sizeof(u32), dst_len);
+ return 0;
+}
+
+static void handle_poolop_reply(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
+{
+ struct ceph_mon_generic_request *req;
+ struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+ u64 tid = le64_to_cpu(msg->hdr.tid);
+
+ if (msg->front.iov_len < sizeof(*reply))
+ goto bad;
+ dout("handle_poolop_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
- rb_erase(&req->node, &monc->generic_request_tree);
- monc->num_generic_requests--;
+ req = __lookup_generic_req(monc, tid);
+ if (req) {
+ if (req->buf_len &&
+ get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
+ msg->front.iov_len - sizeof(*reply),
+ req->buf, req->buf_len) < 0) {
+ mutex_unlock(&monc->mutex);
+ goto bad;
+ }
+ req->result = le32_to_cpu(reply->reply_code);
+ get_generic_request(req);
+ }
mutex_unlock(&monc->mutex);
+ if (req) {
+ complete(&req->completion);
+ put_generic_request(req);
+ }
+ return;
- if (!err)
- err = req->result;
+bad:
+ pr_err("corrupt generic reply, tid %llu\n", tid);
+ ceph_msg_dump(msg);
+}
+
+/*
+ * Do a synchronous pool op.
+ */
+int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+ u32 pool, u64 snapid,
+ char *buf, int len)
+{
+ struct ceph_mon_generic_request *req;
+ struct ceph_mon_poolop *h;
+ int err;
+
+ req = kzalloc(sizeof(*req), GFP_NOFS);
+ if (!req)
+ return -ENOMEM;
+
+ kref_init(&req->kref);
+ req->buf = buf;
+ req->buf_len = len;
+ init_completion(&req->completion);
+
+ err = -ENOMEM;
+ req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+ if (!req->request)
+ goto out;
+ req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+ if (!req->reply)
+ goto out;
+
+ /* fill out request */
+ req->request->hdr.version = cpu_to_le16(2);
+ h = req->request->front.iov_base;
+ h->monhdr.have_version = 0;
+ h->monhdr.session_mon = cpu_to_le16(-1);
+ h->monhdr.session_mon_tid = 0;
+ h->fsid = monc->monmap->fsid;
+ h->pool = cpu_to_le32(pool);
+ h->op = cpu_to_le32(op);
+ h->auid = 0;
+ h->snapid = cpu_to_le64(snapid);
+ h->name_len = 0;
+
+ err = do_generic_request(monc, req);
out:
kref_put(&req->kref, release_generic_request);
return err;
}
+int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 *snapid)
+{
+ return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ pool, 0, (char *)snapid, sizeof(*snapid));
+
+}
+
+int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 snapid)
+{
+ return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ pool, snapid, 0, 0);
+
+}
+
/*
- * Resend pending statfs requests.
+ * Resend pending generic requests.
*/
static void __resend_generic_request(struct ceph_mon_client *monc)
{
@@ -780,12 +911,17 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg);
break;
+ case CEPH_MSG_POOLOP_REPLY:
+ handle_poolop_reply(monc, msg);
+ break;
+
case CEPH_MSG_MON_MAP:
ceph_monc_handle_map(monc, msg);
break;
case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(&monc->client->mdsc, msg);
+ if (monc->client->have_mdsc)
+ ceph_mdsc_handle_map(&monc->client->mdsc, msg);
break;
case CEPH_MSG_OSD_MAP:
@@ -817,6 +953,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack);
break;
+ case CEPH_MSG_POOLOP_REPLY:
case CEPH_MSG_STATFS_REPLY:
return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY:
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h
index 174d794321d0..8e396f2c0963 100644
--- a/fs/ceph/mon_client.h
+++ b/fs/ceph/mon_client.h
@@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
struct rb_node node;
int result;
void *buf;
+ int buf_len;
struct completion completion;
struct ceph_msg *request; /* original request */
struct ceph_msg *reply; /* and reply */
@@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
+extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 *snapid);
+extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 snapid);
#endif
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index d25b4add85b4..d7515e89573f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,6 +6,9 @@
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
+#ifdef CONFIG_BLOCK
+#include <linux/bio.h>
+#endif
#include "super.h"
#include "osd_client.h"
@@ -22,6 +25,35 @@ static int __kick_requests(struct ceph_osd_client *osdc,
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+ struct ceph_file_layout *layout,
+ u64 snapid,
+ u64 off, u64 len, u64 *bno,
+ struct ceph_osd_request *req)
+{
+ struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+ struct ceph_osd_op *op = (void *)(reqhead + 1);
+ u64 orig_len = len;
+ u64 objoff, objlen; /* extent in object */
+
+ reqhead->snapid = cpu_to_le64(snapid);
+
+ /* object extent? */
+ ceph_calc_file_object_mapping(layout, off, &len, bno,
+ &objoff, &objlen);
+ if (len < orig_len)
+ dout(" skipping last %llu, final file extent %llu~%llu\n",
+ orig_len - len, off, len);
+
+ op->extent.offset = cpu_to_le64(objoff);
+ op->extent.length = cpu_to_le64(objlen);
+ req->r_num_pages = calc_pages_for(off, len);
+
+ dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
+ *bno, objoff, objlen, req->r_num_pages);
+
+}
+
/*
* Implement client access to distributed object storage cluster.
*
@@ -48,34 +80,17 @@ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
* fill osd op in request message.
*/
static void calc_layout(struct ceph_osd_client *osdc,
- struct ceph_vino vino, struct ceph_file_layout *layout,
+ struct ceph_vino vino,
+ struct ceph_file_layout *layout,
u64 off, u64 *plen,
struct ceph_osd_request *req)
{
- struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- struct ceph_osd_op *op = (void *)(reqhead + 1);
- u64 orig_len = *plen;
- u64 objoff, objlen; /* extent in object */
u64 bno;
- reqhead->snapid = cpu_to_le64(vino.snap);
-
- /* object extent? */
- ceph_calc_file_object_mapping(layout, off, plen, &bno,
- &objoff, &objlen);
- if (*plen < orig_len)
- dout(" skipping last %llu, final file extent %llu~%llu\n",
- orig_len - *plen, off, *plen);
+ ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
req->r_oid_len = strlen(req->r_oid);
-
- op->extent.offset = cpu_to_le64(objoff);
- op->extent.length = cpu_to_le64(objlen);
- req->r_num_pages = calc_pages_for(off, *plen);
-
- dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
- req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
}
/*
@@ -101,6 +116,10 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
+#ifdef CONFIG_BLOCK
+ if (req->r_bio)
+ bio_put(req->r_bio);
+#endif
ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
@@ -108,43 +127,35 @@ void ceph_osdc_release_request(struct kref *kref)
kfree(req);
}
-/*
- * build new request AND message, calculate layout, and adjust file
- * extent as needed.
- *
- * if the file was recently truncated, we include information about its
- * old and new size so that the object can be updated appropriately. (we
- * avoid synchronously deleting truncated objects because it's slow.)
- *
- * if @do_sync, include a 'startsync' command so that the osd will flush
- * data quickly.
- */
-struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
- struct ceph_file_layout *layout,
- struct ceph_vino vino,
- u64 off, u64 *plen,
- int opcode, int flags,
+struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+ int flags,
struct ceph_snap_context *snapc,
int do_sync,
- u32 truncate_seq,
- u64 truncate_size,
- struct timespec *mtime,
- bool use_mempool, int num_reply)
+ bool use_mempool,
+ gfp_t gfp_flags,
+ struct page **pages,
+ struct bio *bio)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
- struct ceph_osd_request_head *head;
- struct ceph_osd_op *op;
- void *p;
int num_op = 1 + do_sync;
- size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
- int i;
+ size_t msg_size = sizeof(struct ceph_osd_request_head) +
+ num_op*sizeof(struct ceph_osd_op);
+
+ if (use_mempool) {
+ req = mempool_alloc(osdc->req_mempool, gfp_flags);
+ memset(req, 0, sizeof(*req));
+ } else {
+ req = kzalloc(sizeof(*req), gfp_flags);
+ }
+ if (!req)
+ return NULL;
if (use_mempool) {
- req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
+ req = mempool_alloc(osdc->req_mempool, gfp_flags);
memset(req, 0, sizeof(*req));
} else {
- req = kzalloc(sizeof(*req), GFP_NOFS);
+ req = kzalloc(sizeof(*req), gfp_flags);
}
if (req == NULL)
return NULL;
@@ -164,7 +175,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
else
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
- OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
+ OSD_OPREPLY_FRONT_LEN, gfp_flags);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
@@ -178,18 +189,54 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else
- msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
}
msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
memset(msg->front.iov_base, 0, msg->front.iov_len);
+
+ req->r_request = msg;
+ req->r_pages = pages;
+#ifdef CONFIG_BLOCK
+ if (bio) {
+ req->r_bio = bio;
+ bio_get(req->r_bio);
+ }
+#endif
+
+ return req;
+}
+
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req,
+ u64 off, u64 *plen,
+ int opcode,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ u32 truncate_seq,
+ u64 truncate_size,
+ struct timespec *mtime,
+ const char *oid,
+ int oid_len)
+{
+ struct ceph_msg *msg = req->r_request;
+ struct ceph_osd_request_head *head;
+ struct ceph_osd_op *op;
+ void *p;
+ int num_op = 1 + do_sync;
+ size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+ int i;
+ int flags = req->r_flags;
+
head = msg->front.iov_base;
op = (void *)(head + 1);
p = (void *)(op + num_op);
- req->r_request = msg;
req->r_snapc = ceph_get_snap_context(snapc);
head->client_inc = cpu_to_le32(1); /* always, for now. */
@@ -199,10 +246,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
head->num_ops = cpu_to_le16(num_op);
op->op = cpu_to_le16(opcode);
- /* calculate max write size */
- calc_layout(osdc, vino, layout, off, plen, req);
- req->r_file_layout = *layout; /* keep a copy */
-
if (flags & CEPH_OSD_FLAG_WRITE) {
req->r_request->hdr.data_off = cpu_to_le16(off);
req->r_request->hdr.data_len = cpu_to_le32(*plen);
@@ -212,9 +255,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
op->extent.truncate_seq = cpu_to_le32(truncate_seq);
/* fill in oid */
- head->object_len = cpu_to_le32(req->r_oid_len);
- memcpy(p, req->r_oid, req->r_oid_len);
- p += req->r_oid_len;
+ head->object_len = cpu_to_le32(oid_len);
+ memcpy(p, oid, oid_len);
+ p += oid_len;
if (do_sync) {
op++;
@@ -233,6 +276,50 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
msg_size = p - msg->front.iov_base;
msg->front.iov_len = msg_size;
msg->hdr.front_len = cpu_to_le32(msg_size);
+ return;
+}
+
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.
+ *
+ * if the file was recently truncated, we include information about its
+ * old and new size so that the object can be updated appropriately. (we
+ * avoid synchronously deleting truncated objects because it's slow.)
+ *
+ * if @do_sync, include a 'startsync' command so that the osd will flush
+ * data quickly.
+ */
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+ struct ceph_file_layout *layout,
+ struct ceph_vino vino,
+ u64 off, u64 *plen,
+ int opcode, int flags,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ u32 truncate_seq,
+ u64 truncate_size,
+ struct timespec *mtime,
+ bool use_mempool, int num_reply)
+{
+ struct ceph_osd_request *req =
+ ceph_osdc_alloc_request(osdc, flags,
+ snapc, do_sync,
+ use_mempool,
+ GFP_NOFS, NULL, NULL);
+ if (IS_ERR(req))
+ return req;
+
+ /* calculate max write size */
+ calc_layout(osdc, vino, layout, off, plen, req);
+ req->r_file_layout = *layout; /* keep a copy */
+
+ ceph_osdc_build_request(req, off, plen, opcode,
+ snapc, do_sync,
+ truncate_seq, truncate_size,
+ mtime,
+ req->r_oid, req->r_oid_len);
+
return req;
}
@@ -1104,6 +1191,9 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
req->r_request->pages = req->r_pages;
req->r_request->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+ req->r_request->bio = req->r_bio;
+#endif
register_request(osdc, req);
@@ -1422,6 +1512,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
}
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+ m->bio = req->r_bio;
+#endif
}
*skip = 0;
req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index ce776989ef6a..2daa3fcae63e 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -68,6 +68,7 @@ struct ceph_osd_request {
struct list_head r_unsafe_item;
struct inode *r_inode; /* for use by callbacks */
+ void *r_priv; /* ditto */
char r_oid[40]; /* object name */
int r_oid_len;
@@ -80,6 +81,9 @@ struct ceph_osd_request {
struct page **r_pages; /* pages for data payload */
int r_pages_from_pool;
int r_own_pages; /* if true, i own page list */
+#ifdef CONFIG_BLOCK
+ struct bio *r_bio; /* instead of pages */
+#endif
};
struct ceph_osd_client {
@@ -119,6 +123,32 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg);
+extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+ struct ceph_file_layout *layout,
+ u64 snapid,
+ u64 off, u64 len, u64 *bno,
+ struct ceph_osd_request *req);
+
+extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+ int flags,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ bool use_mempool,
+ gfp_t gfp_flags,
+ struct page **pages,
+ struct bio *bio);
+
+extern void ceph_osdc_build_request(struct ceph_osd_request *req,
+ u64 off, u64 *plen,
+ int opcode,
+ struct ceph_snap_context *snapc,
+ int do_sync,
+ u32 truncate_seq,
+ u64 truncate_size,
+ struct timespec *mtime,
+ const char *oid,
+ int oid_len);
+
extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
struct ceph_file_layout *layout,
struct ceph_vino vino,
diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
index ddc656fb5c05..11aac1c60943 100644
--- a/fs/ceph/osdmap.c
+++ b/fs/ceph/osdmap.c
@@ -417,6 +417,19 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
return NULL;
}
+int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
+{
+ struct rb_node *rbp;
+
+ for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
+ struct ceph_pg_pool_info *pi =
+ rb_entry(rbp, struct ceph_pg_pool_info, node);
+ if (pi->name && strcmp(pi->name, name) == 0)
+ return pi->id;
+ }
+ return -ENOENT;
+}
+
static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
{
rb_erase(&pi->node, root);
diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h
index 970b547e510d..a592b211be39 100644
--- a/fs/ceph/osdmap.h
+++ b/fs/ceph/osdmap.h
@@ -125,4 +125,6 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
struct ceph_pg pgid);
+extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
+
#endif
diff --git a/fs/ceph/rbd.c b/fs/ceph/rbd.c
new file mode 100644
index 000000000000..a745f3f3481c
--- /dev/null
+++ b/fs/ceph/rbd.c
@@ -0,0 +1,1803 @@
+/*
+ rbd.c -- Export ceph rados objects as a Linux block device
+
+
+ based on drivers/block/osdblk.c:
+
+ Copyright 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to
+ the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+
+
+
+ Instructions for use
+ --------------------
+
+ 1) Map a Linux block device to an existing rbd image.
+
+ Usage: <mon ip addr> <options> <pool name> <rbd image name>
+
+ $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
+
+
+ 2) List all active blkdev<->object mappings.
+
+ In this example, we have performed step #1 twice, creating two blkdevs,
+ mapped to two separate rados objects in the rados rbd pool
+
+ $ cat /sys/class/rbd/list
+ 0 254 rbd foo
+ 1 253 rbd bar
+
+ The columns, in order, are:
+ - blkdev unique id
+ - blkdev assigned major
+ - rados pool name
+ - rados block device name
+
+
+ 3) Remove an active blkdev<->rbd image mapping.
+
+ In this example, we remove the mapping with blkdev unique id 1.
+
+ $ echo 1 > /sys/class/rbd/remove
+
+
+ NOTE: The actual creation and deletion of rados objects is outside the scope
+ of this driver.
+
+ */
+
+#include "super.h"
+#include "osd_client.h"
+#include "rbd_types.h"
+#include "mon_client.h"
+
+#include <linux/kernel.h>
+#include <linux/device.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/blkdev.h>
+
+#define DRV_NAME "rbd"
+#define DRV_NAME_LONG "rbd (rados block device)"
+
+enum {
+ RBD_MINORS_PER_MAJOR = 256, /* max minors per blkdev */
+};
+
+#define RBD_MAX_POOL_NAME_SIZE 64
+
+#define RBD_STRIPE_UNIT (1 << 22)
+
+#define RBD_MAX_OPT_LEN 1024
+#define RBD_MAX_SNAP_NAME_LEN 32
+
+#define RBD_SNAP_OP_CREATE 0x1
+#define RBD_SNAP_OP_SET 0x2
+
+#define RBD_SNAP_HEAD_NAME "head"
+
+struct rbd_obj_header {
+ u64 image_size;
+ __u8 obj_order;
+ __u8 crypt_type;
+ __u8 comp_type;
+ struct rw_semaphore snap_rwsem;
+ struct ceph_snap_context *snapc;
+ size_t snap_names_len;
+ u32 snap_seq;
+ u32 total_snaps;
+
+ char *snap_names;
+ u64 *snap_sizes;
+};
+
+struct rbd_request {
+ struct request *rq; /* blk layer request */
+ struct bio *bio; /* cloned bio */
+ struct page **pages; /* list of used pages */
+ u64 len;
+};
+
+struct rbd_client_node {
+ struct ceph_client *client;
+ const char *opt;
+ struct kref kref;
+ struct list_head node;
+};
+
+#define DEV_NAME_LEN 32
+
+struct rbd_device {
+ int id; /* blkdev unique id */
+
+ int major; /* blkdev assigned major */
+ struct gendisk *disk; /* blkdev's gendisk and rq */
+ struct request_queue *q;
+
+ struct ceph_client *client; /* associated OSD */
+
+ char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
+
+ spinlock_t lock; /* queue lock */
+
+ struct rbd_obj_header header;
+ char obj[RBD_MAX_OBJ_NAME_SIZE]; /* rbd image name */
+ int obj_len;
+ char pool_name[RBD_MAX_POOL_NAME_SIZE];
+ int poolid;
+
+ u32 cur_snap; /* index+1 of current snapshot within snap context
+ 0 - for the head */
+ int read_only;
+
+ struct list_head node;
+ struct rbd_client_node *client_node;
+};
+
+static spinlock_t node_lock; /* protects client get/put */
+
+static struct class *class_rbd; /* /sys/class/rbd */
+static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
+static LIST_HEAD(rbddev_list);
+static LIST_HEAD(node_list);
+
+
+static int rbd_open(struct block_device *bdev, fmode_t mode)
+{
+ struct gendisk *disk = bdev->bd_disk;
+ struct rbd_device *rbd_dev = disk->private_data;
+
+ if (mode & FMODE_WRITE && rbd_dev->read_only)
+ return -EROFS;
+
+ return 0;
+}
+
+static const struct block_device_operations rbd_bd_ops = {
+ .owner = THIS_MODULE,
+ .open = rbd_open,
+};
+
+/*
+ * Initialize ceph client for a specific device.
+ */
+static int rbd_init_client(struct rbd_device *rbd_dev,
+ struct ceph_mount_args *args)
+{
+ struct ceph_osd_client *osdc;
+ int ret;
+ dout("rbd_init_device\n");
+ rbd_dev->client = ceph_create_client(args, 0);
+ if (IS_ERR(rbd_dev->client))
+ return PTR_ERR(rbd_dev->client);
+
+ ret = ceph_open_session(rbd_dev->client);
+ if (ret < 0)
+ goto done_err;
+
+ osdc = &rbd_dev->client->osdc;
+ ret = ceph_pg_poolid_by_name(osdc->osdmap,
+ rbd_dev->pool_name);
+ if (ret < 0)
+ goto done_err;
+
+ rbd_dev->poolid = ret;
+ return 0;
+
+done_err:
+ ceph_destroy_client(rbd_dev->client);
+ rbd_dev->client = NULL;
+ return ret;
+}
+
+/*
+ * Find a ceph client with specific addr and configuration.
+ */
+static struct rbd_client_node *__get_client_node(struct ceph_mount_args *args)
+{
+ struct rbd_client_node *client_node;
+
+ if (args->flags & CEPH_OPT_NOSHARE)
+ return NULL;
+
+ list_for_each_entry(client_node, &node_list, node)
+ if (ceph_compare_mount_args(args, client_node->client) == 0)
+ return client_node;
+ return NULL;
+}
+
+/*
+ * Get a ceph client with specific addr and configuration, if one does
+ * not exist create it.
+ */
+static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
+ char *opt)
+{
+ struct rbd_client_node *client_node;
+ struct ceph_mount_args *args;
+ int ret;
+
+
+ args = parse_mount_args(0, opt, mon_addr, NULL);
+ if (IS_ERR(args))
+ return PTR_ERR(args);
+
+ spin_lock(&node_lock);
+
+ client_node = __get_client_node(args);
+ if (client_node) {
+ ceph_destroy_mount_args(args);
+
+ kref_get(&client_node->kref);
+ rbd_dev->client_node = client_node;
+ rbd_dev->client = client_node->client;
+ spin_unlock(&node_lock);
+ return 0;
+ }
+
+ spin_unlock(&node_lock);
+
+ ret = -ENOMEM;
+ client_node = kmalloc(sizeof(struct rbd_client_node), GFP_KERNEL);
+ if (!client_node)
+ goto out_args;
+
+ ret = rbd_init_client(rbd_dev, args);
+ if (ret < 0)
+ goto out_free;
+
+ client_node->client = rbd_dev->client;
+ client_node->opt = kstrdup(opt, GFP_KERNEL);
+ kref_init(&client_node->kref);
+ INIT_LIST_HEAD(&client_node->node);
+
+ rbd_dev->client_node = client_node;
+
+ spin_lock(&node_lock);
+ list_add_tail(&client_node->node, &node_list);
+ spin_unlock(&node_lock);
+
+ return 0;
+
+out_free:
+ kfree(client_node);
+out_args:
+ ceph_destroy_mount_args(args);
+ return ret;
+}
+
+/*
+ * Destroy ceph client
+ */
+static void rbd_release_client(struct kref *kref)
+{
+ struct rbd_client_node *node =
+ container_of(kref, struct rbd_client_node, kref);
+
+ dout("rbd_release_client\n");
+
+ spin_lock(&node_lock);
+ list_del(&node->node);
+ spin_unlock(&node_lock);
+
+ ceph_destroy_client(node->client);
+ kfree(node->opt);
+ kfree(node);
+}
+
+/*
+ * Drop reference to ceph client node. If it's not referenced anymore, release
+ * it.
+ */
+static void rbd_put_client(struct rbd_device *rbd_dev)
+{
+ if (!rbd_dev->client_node)
+ return;
+
+ kref_put(&rbd_dev->client_node->kref, rbd_release_client);
+ rbd_dev->client_node = NULL;
+}
+
+static int snap_index(struct rbd_obj_header *header, int snap_num)
+{
+ return header->total_snaps - snap_num;
+}
+
+static u64 cur_snap_id(struct rbd_device *rbd_dev)
+{
+ struct rbd_obj_header *header = &rbd_dev->header;
+
+ if (!rbd_dev->cur_snap)
+ return 0;
+
+ return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
+}
+
+
+/*
+ * Create a new header structure, translate header format from the on-disk
+ * header.
+ */
+static int rbd_header_from_disk(struct rbd_obj_header *header,
+ struct rbd_obj_header_ondisk *ondisk,
+ int allocated_snaps,
+ gfp_t gfp_flags)
+{
+ int i;
+ u32 snap_count = le32_to_cpu(ondisk->snap_count);
+ int ret = -ENOMEM;
+
+ init_rwsem(&header->snap_rwsem);
+
+ header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+ header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
+ snap_count *
+ sizeof(struct rbd_obj_snap_ondisk),
+ gfp_flags);
+ if (!header->snapc)
+ return -ENOMEM;
+ if (snap_count) {
+ header->snap_names = kmalloc(header->snap_names_len,
+ GFP_KERNEL);
+ if (!header->snap_names)
+ goto err_snapc;
+ header->snap_sizes = kmalloc(snap_count * sizeof(u64),
+ GFP_KERNEL);
+ if (!header->snap_sizes)
+ goto err_names;
+ } else {
+ header->snap_names = NULL;
+ header->snap_sizes = NULL;
+ }
+
+ header->image_size = le64_to_cpu(ondisk->image_size);
+ header->obj_order = ondisk->obj_order;
+ header->crypt_type = ondisk->crypt_type;
+ header->comp_type = ondisk->comp_type;
+
+ atomic_set(&header->snapc->nref, 1);
+ header->snap_seq = le32_to_cpu(ondisk->snap_seq);
+ header->snapc->num_snaps = snap_count;
+ header->total_snaps = snap_count;
+
+ if (snap_count &&
+ allocated_snaps == snap_count) {
+ for (i = 0; i < snap_count; i++) {
+ header->snapc->snaps[i] =
+ le64_to_cpu(ondisk->snaps[i].id);
+ header->snap_sizes[i] =
+ le64_to_cpu(ondisk->snaps[i].image_size);
+ }
+
+ /* copy snapshot names */
+ memcpy(header->snap_names, &ondisk->snaps[i],
+ header->snap_names_len);
+ }
+
+ return 0;
+
+err_names:
+ kfree(header->snap_names);
+err_snapc:
+ kfree(header->snapc);
+ return ret;
+}
+
+/*
+ * Create a new header structure, translate header format from the on-disk
+ * header.
+ */
+static int rbd_header_to_disk(struct rbd_obj_header_ondisk **ondisk,
+ struct rbd_obj_header_ondisk *old_ondisk,
+ struct rbd_obj_header *header,
+ gfp_t gfp_flags)
+{
+ int i;
+
+ down_read(&header->snap_rwsem);
+ *ondisk = kmalloc(sizeof(struct rbd_obj_header_ondisk) +
+ header->snap_names_len +
+ header->total_snaps *
+ sizeof(struct rbd_obj_snap_ondisk),
+ gfp_flags);
+ if (!*ondisk)
+ return -ENOMEM;
+
+ memcpy(*ondisk, old_ondisk, sizeof(*old_ondisk));
+
+ (*ondisk)->snap_seq = cpu_to_le32(header->snap_seq);
+ (*ondisk)->snap_count = cpu_to_le32(header->total_snaps);
+ (*ondisk)->snap_names_len = cpu_to_le64(header->snap_names_len);
+
+ if (header->total_snaps) {
+ for (i = 0; i < header->total_snaps; i++) {
+ (*ondisk)->snaps[i].id =
+ cpu_to_le64(header->snapc->snaps[i]);
+ (*ondisk)->snaps[i].image_size =
+ cpu_to_le64(header->snap_sizes[i]);
+ }
+
+ /* copy snapshot names */
+ memcpy(&(*ondisk)->snaps[i], header->snap_names,
+ header->snap_names_len);
+ }
+ up_read(&header->snap_rwsem);
+
+ return 0;
+}
+
+static int rbd_header_add_snap(struct rbd_device *dev,
+ const char *snap_name,
+ gfp_t gfp_flags)
+{
+ struct rbd_obj_header *header = &dev->header;
+ struct ceph_snap_context *new_snapc;
+ char *p;
+ int name_len = strlen(snap_name);
+ u64 *snaps = header->snapc->snaps;
+ u64 *new_sizes;
+ char *new_names;
+ u64 new_snapid;
+ int i;
+ int ret = -EINVAL;
+
+ down_write(&header->snap_rwsem);
+
+ /* we can create a snapshot only if we're pointing at the head */
+ if (dev->cur_snap)
+ goto done;
+
+ ret = -EEXIST;
+ p = header->snap_names;
+ for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
+ if (strcmp(snap_name, p) == 0)
+ goto done;
+ }
+
+ ret = -ENOMEM;
+ new_snapc = kmalloc(sizeof(struct rbd_obj_header) +
+ (header->total_snaps + 1) * sizeof(u64),
+ gfp_flags);
+ if (!new_snapc)
+ goto done;
+ new_names = kmalloc(header->snap_names_len + name_len + 1, gfp_flags);
+ if (!new_names)
+ goto err_snapc;
+ new_sizes = kmalloc((header->total_snaps + 1) * sizeof(u64),
+ gfp_flags);
+ if (!new_sizes)
+ goto err_names;
+
+ atomic_set(&new_snapc->nref, 1);
+ new_snapc->num_snaps = header->total_snaps + 1;
+ if (header->total_snaps)
+ memcpy(&new_snapc->snaps[1], snaps,
+ (header->total_snaps) * sizeof(u64));
+
+ ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
+ &new_snapid);
+ dout("created snapid=%lld\n", new_snapid);
+ if (ret < 0)
+ goto err_sizes;
+
+ new_snapc->seq = new_snapid; /* we're still pointing at the head */
+ header->snap_seq = new_snapid;
+ new_snapc->snaps[0] = new_snapid;
+
+ /* copy snap names */
+ if (header->snap_names)
+ memcpy(new_names + name_len + 1, header->snap_names,
+ header->snap_names_len);
+
+ memcpy(new_names, snap_name, name_len + 1);
+ header->snap_names_len += name_len + 1;
+
+ /* copy snap image sizes */
+ if (header->snap_sizes)
+ memcpy(new_sizes, header->snap_sizes,
+ header->total_snaps * sizeof(u64));
+ new_sizes[new_snapc->num_snaps - 1] = header->image_size;
+
+ header->total_snaps = new_snapc->num_snaps;
+
+ kfree(header->snapc);
+ header->snapc = new_snapc;
+ kfree(header->snap_names);
+ header->snap_names = new_names;
+ kfree(header->snap_sizes);
+ header->snap_sizes = new_sizes;
+
+ ret = 0;
+done:
+ up_write(&header->snap_rwsem);
+ return ret;
+err_sizes:
+ kfree(new_sizes);
+err_names:
+ kfree(new_names);
+err_snapc:
+ kfree(new_snapc);
+ up_write(&header->snap_rwsem);
+ return ret;
+}
+
+static int rbd_header_set_snap(struct rbd_device *dev,
+ const char *snap_name,
+ u64 *size)
+{
+ struct rbd_obj_header *header = &dev->header;
+ struct ceph_snap_context *snapc = header->snapc;
+ char *p;
+ int i;
+ int ret = -ENOENT;
+
+ down_write(&header->snap_rwsem);
+
+ if (!snap_name ||
+ !*snap_name ||
+ strcmp(snap_name, "-") == 0 ||
+ strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
+ if (header->total_snaps)
+ snapc->seq = header->snap_seq;
+ else
+ snapc->seq = 0;
+ dev->cur_snap = 0;
+ dev->read_only = 0;
+ if (size)
+ *size = header->image_size;
+ } else {
+ p = header->snap_names;
+ for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
+ if (strcmp(snap_name, p) == 0)
+ break;
+ }
+ if (i == header->total_snaps)
+ goto done;
+
+ snapc->seq = snapc->snaps[i];
+ dev->cur_snap = header->total_snaps - i;
+ dev->read_only = 1;
+ if (size)
+ *size = header->snap_sizes[i];
+ }
+
+ ret = 0;
+done:
+ up_write(&header->snap_rwsem);
+ return ret;
+}
+
+static void rbd_header_free(struct rbd_obj_header *header)
+{
+ kfree(header->snapc);
+ kfree(header->snap_names);
+ kfree(header->snap_sizes);
+}
+
+/*
+ * get the actual striped segment name, offset and length
+ */
+static u64 rbd_get_segment(struct rbd_obj_header *header,
+ const char *obj_name,
+ u64 ofs, u64 len,
+ char *seg_name, u64 *segofs)
+{
+ u64 seg = ofs >> header->obj_order;
+
+ if (seg_name)
+ snprintf(seg_name, RBD_MAX_SEG_NAME_SIZE,
+ "%s.%012llx", obj_name, seg);
+
+ ofs = ofs & ((1 << header->obj_order) - 1);
+ len = min_t(u64, len, (1 << header->obj_order) - ofs);
+
+ if (segofs)
+ *segofs = ofs;
+
+ return len;
+}
+
+static void bio_chain_put(struct bio *chain)
+{
+ struct bio *tmp;
+
+ while (chain) {
+ tmp = chain;
+ chain = chain->bi_next;
+
+ bio_put(tmp);
+ }
+}
+
+/*
+ * zeros a bio chain, starting at specific offset
+ */
+static void zero_bio_chain(struct bio *chain, int start_ofs)
+{
+ struct bio_vec *bv;
+ unsigned long flags;
+ void *buf;
+ int i;
+ int pos = 0;
+
+ while (chain) {
+ bio_for_each_segment(bv, chain, i) {
+ if (pos + bv->bv_len > start_ofs) {
+ int remainder = max(start_ofs - pos, 0);
+ buf = bvec_kmap_irq(bv, &flags);
+ memset(buf + remainder, 0,
+ bv->bv_len - remainder);
+ bvec_kunmap_irq(bv, &flags);
+ }
+ pos += bv->bv_len;
+ }
+
+ chain = chain->bi_next;
+ }
+}
+
+/*
+ * bio_chain_clone - clone a chain of bios up to a certain length.
+ * might return a bio_pair that will need to be released.
+ */
+static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
+ struct bio_pair **bp,
+ int len, gfp_t gfpmask)
+{
+ struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
+ int total = 0;
+
+ if (*bp) {
+ bio_pair_release(*bp);
+ *bp = NULL;
+ }
+
+ while (old_chain && (total < len)) {
+ tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
+ if (!tmp)
+ goto err_out;
+
+ if (total + old_chain->bi_size > len) {
+ struct bio_pair *bp;
+
+ /*
+ * this split can only happen with a single paged bio,
+ * split_bio will BUG_ON if this is not the case
+ */
+ dout("bio_chain_clone split! total=%d remaining=%d"
+ "bi_size=%d\n",
+ (int)total, (int)len-total,
+ (int)old_chain->bi_size);
+
+ /* split the bio. We'll release it either in the next
+ call, or it will have to be released outside */
+ bp = bio_split(old_chain, (len - total) / 512ULL);
+ if (!bp)
+ goto err_out;
+
+ __bio_clone(tmp, &bp->bio1);
+
+ *next = &bp->bio2;
+ } else {
+ __bio_clone(tmp, old_chain);
+ *next = old_chain->bi_next;
+ }
+
+ tmp->bi_bdev = NULL;
+ gfpmask &= ~__GFP_WAIT;
+ tmp->bi_next = NULL;
+
+ if (!new_chain) {
+ new_chain = tail = tmp;
+ } else {
+ tail->bi_next = tmp;
+ tail = tmp;
+ }
+ old_chain = old_chain->bi_next;
+
+ total += tmp->bi_size;
+ }
+
+ BUG_ON(total < len);
+
+ if (tail)
+ tail->bi_next = NULL;
+
+ *old = old_chain;
+
+ return new_chain;
+
+err_out:
+ dout("bio_chain_clone with err\n");
+ bio_chain_put(new_chain);
+ return NULL;
+}
+
+/*
+ * Send ceph osd request
+ */
+static int rbd_do_request(struct request *rq,
+ struct rbd_device *dev,
+ struct ceph_snap_context *snapc,
+ u64 snapid,
+ const char *obj, u64 ofs, u64 len,
+ struct bio *bio,
+ struct page **pages,
+ int num_pages,
+ int opcode, int flags,
+ int num_reply,
+ void (*rbd_cb)(struct ceph_osd_request *req,
+ struct ceph_msg *msg))
+{
+ struct ceph_osd_request *req;
+ struct ceph_file_layout *layout;
+ int ret;
+ u64 bno;
+ struct timespec mtime = CURRENT_TIME;
+ struct rbd_request *req_data;
+ struct ceph_osd_request_head *reqhead;
+ struct rbd_obj_header *header = &dev->header;
+
+ ret = -ENOMEM;
+ req_data = kzalloc(sizeof(*req_data), GFP_NOFS);
+ if (!req_data)
+ goto done;
+
+ dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
+
+ down_read(&header->snap_rwsem);
+
+ req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
+ snapc, 0,
+ false,
+ GFP_NOFS, pages, bio);
+ if (IS_ERR(req)) {
+ up_read(&header->snap_rwsem);
+ ret = PTR_ERR(req);
+ goto done_pages;
+ }
+
+ req->r_callback = rbd_cb;
+
+ req_data->rq = rq;
+ req_data->bio = bio;
+ req_data->pages = pages;
+ req_data->len = len;
+
+ req->r_priv = req_data;
+
+ reqhead = req->r_request->front.iov_base;
+ reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+
+ strncpy(req->r_oid, obj, sizeof(req->r_oid));
+ req->r_oid_len = strlen(req->r_oid);
+
+ layout = &req->r_file_layout;
+ memset(layout, 0, sizeof(*layout));
+ layout->fl_stripe_unit = RBD_STRIPE_UNIT;
+ layout->fl_stripe_count = 1;
+ layout->fl_object_size = RBD_STRIPE_UNIT;
+ layout->fl_pg_preferred = -1;
+ layout->fl_pg_pool = dev->poolid;
+ ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
+ ofs, len, &bno, req);
+
+ ceph_osdc_build_request(req, ofs, &len, opcode,
+ snapc, 0,
+ 0, 0,
+ &mtime,
+ req->r_oid, req->r_oid_len);
+ up_read(&header->snap_rwsem);
+
+ ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
+ if (ret < 0)
+ goto done_err;
+
+ if (!rbd_cb) {
+ ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+ ceph_osdc_put_request(req);
+ }
+ return ret;
+
+done_err:
+ bio_chain_put(req_data->bio);
+ ceph_osdc_put_request(req);
+done_pages:
+ kfree(req_data);
+done:
+ if (rq)
+ blk_end_request(rq, ret, len);
+ return ret;
+}
+
+/*
+ * Ceph osd op callback
+ */
+static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+ struct rbd_request *req_data = req->r_priv;
+ struct ceph_osd_reply_head *replyhead;
+ struct ceph_osd_op *op;
+ __s32 rc;
+ u64 bytes;
+ int read_op;
+
+ /* parse reply */
+ replyhead = msg->front.iov_base;
+ WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+ op = (void *)(replyhead + 1);
+ rc = le32_to_cpu(replyhead->result);
+ bytes = le64_to_cpu(op->extent.length);
+
+ dout("rbd_req_cb bytes=%lld rc=%d\n", bytes, rc);
+
+ read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
+
+ if (rc == -ENOENT && read_op) {
+ zero_bio_chain(req_data->bio, 0);
+ rc = 0;
+ } else if (rc == 0 && read_op && bytes < req_data->len) {
+ zero_bio_chain(req_data->bio, bytes);
+ bytes = req_data->len;
+ }
+
+ blk_end_request(req_data->rq, rc, bytes);
+
+ if (req_data->bio)
+ bio_chain_put(req_data->bio);
+
+ ceph_osdc_put_request(req);
+ kfree(req_data);
+}
+
+/*
+ * Do a synchronous ceph osd operation
+ */
+static int rbd_req_sync_op(struct rbd_device *dev,
+ struct ceph_snap_context *snapc,
+ u64 snapid,
+ int opcode, int flags,
+ int num_reply,
+ const char *obj,
+ u64 ofs, u64 len,
+ char *buf)
+{
+ int ret;
+ struct page **pages;
+ int num_pages;
+
+ num_pages = calc_pages_for(ofs , len);
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (!pages)
+ return -ENOMEM;
+
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
+ if (ret < 0)
+ goto done;
+ }
+
+ ret = rbd_do_request(NULL, dev, snapc, snapid,
+ obj, ofs, len, NULL,
+ pages, num_pages,
+ opcode,
+ flags,
+ 2,
+ NULL);
+ if (ret < 0)
+ goto done;
+
+ if (flags & CEPH_OSD_FLAG_READ)
+ ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+
+done:
+ ceph_release_page_vector(pages, num_pages);
+ return ret;
+}
+
+/*
+ * Do an asynchronous ceph osd operation
+ */
+static int rbd_do_op(struct request *rq,
+ struct rbd_device *rbd_dev ,
+ struct ceph_snap_context *snapc,
+ u64 snapid,
+ int opcode, int flags, int num_reply,
+ u64 ofs, u64 len,
+ struct bio *bio)
+{
+ char *seg_name;
+ u64 seg_ofs;
+ u64 seg_len;
+ int ret;
+
+ seg_name = kmalloc(RBD_MAX_SEG_NAME_SIZE + 1, GFP_NOIO);
+ if (!seg_name)
+ return -ENOMEM;
+
+ seg_len = rbd_get_segment(&rbd_dev->header,
+ rbd_dev->obj,
+ ofs, len,
+ seg_name, &seg_ofs);
+ if (seg_len < 0)
+ return seg_len;
+
+ /* we've taken care of segment sizes earlier when we
+ cloned the bios. We should never have a segment
+ truncated at this point */
+ BUG_ON(seg_len < len);
+
+ ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
+ seg_name, seg_ofs, seg_len,
+ bio,
+ NULL, 0,
+ opcode,
+ flags,
+ num_reply,
+ rbd_req_cb);
+ kfree(seg_name);
+ return ret;
+}
+
+/*
+ * Request async osd write
+ */
+static int rbd_req_write(struct request *rq,
+ struct rbd_device *rbd_dev,
+ struct ceph_snap_context *snapc,
+ u64 ofs, u64 len,
+ struct bio *bio)
+{
+ return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+ 2,
+ ofs, len, bio);
+}
+
+/*
+ * Request sync osd write
+ */
+static int rbd_req_sync_write(struct rbd_device *dev,
+ struct ceph_snap_context *snapc,
+ u64 snapid,
+ const char *obj,
+ u64 ofs, u64 len,
+ char *buf)
+{
+ return rbd_req_sync_op(dev, snapc, snapid,
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+ 2, obj, ofs, len, buf);
+}
+
+/*
+ * Request async osd read
+ */
+static int rbd_req_read(struct request *rq,
+ struct rbd_device *rbd_dev,
+ u64 snapid,
+ u64 ofs, u64 len,
+ struct bio *bio)
+{
+ return rbd_do_op(rq, rbd_dev, NULL,
+ (snapid ? snapid : CEPH_NOSNAP),
+ CEPH_OSD_OP_READ,
+ CEPH_OSD_FLAG_READ,
+ 2,
+ ofs, len, bio);
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_read(struct rbd_device *dev,
+ struct ceph_snap_context *snapc,
+ u64 snapid,
+ const char *obj,
+ u64 ofs, u64 len,
+ char *buf)
+{
+ return rbd_req_sync_op(dev, NULL,
+ (snapid ? snapid : CEPH_NOSNAP),
+ CEPH_OSD_OP_READ,
+ CEPH_OSD_FLAG_READ,
+ 1, obj, ofs, len, buf);
+}
+
+/*
+ * block device queue callback
+ */
+static void rbd_rq_fn(struct request_queue *q)
+{
+ struct rbd_device *rbd_dev = q->queuedata;
+ struct request *rq;
+ struct bio_pair *bp = NULL;
+
+ rq = blk_fetch_request(q);
+
+ while (1) {
+ struct bio *bio;
+ struct bio *rq_bio, *next_bio = NULL;
+ bool do_write;
+ int size, op_size = 0;
+ u64 ofs;
+
+ /* peek at request from block layer */
+ if (!rq)
+ break;
+
+ dout("fetched request\n");
+
+ /* filter out block requests we don't understand */
+ if (!blk_fs_request(rq) && !blk_barrier_rq(rq)) {
+ __blk_end_request_all(rq, 0);
+ goto next;
+ }
+
+ /* deduce our operation (read, write) */
+ do_write = (rq_data_dir(rq) == WRITE);
+
+ size = blk_rq_bytes(rq);
+ ofs = blk_rq_pos(rq) * 512ULL;
+ rq_bio = rq->bio;
+ if (do_write && rbd_dev->read_only) {
+ __blk_end_request_all(rq, -EROFS);
+ goto next;
+ }
+
+ spin_unlock_irq(q->queue_lock);
+
+ dout("%s 0x%x bytes at 0x%llx\n",
+ do_write ? "write" : "read",
+ size, blk_rq_pos(rq) * 512ULL);
+
+ do {
+ /* a bio clone to be passed down to OSD req */
+ dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
+ op_size = rbd_get_segment(&rbd_dev->header,
+ rbd_dev->obj,
+ ofs, size,
+ NULL, NULL);
+ bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
+ op_size, GFP_ATOMIC);
+ if (!bio) {
+ spin_lock_irq(q->queue_lock);
+ __blk_end_request_all(rq, -ENOMEM);
+ goto next;
+ }
+
+ /* init OSD command: write or read */
+ if (do_write)
+ rbd_req_write(rq, rbd_dev,
+ rbd_dev->header.snapc,
+ ofs,
+ op_size, bio);
+ else
+ rbd_req_read(rq, rbd_dev,
+ cur_snap_id(rbd_dev),
+ ofs,
+ op_size, bio);
+
+ size -= op_size;
+ ofs += op_size;
+
+ rq_bio = next_bio;
+ } while (size > 0);
+
+ if (bp)
+ bio_pair_release(bp);
+
+ spin_lock_irq(q->queue_lock);
+next:
+ rq = blk_fetch_request(q);
+ }
+}
+
+/*
+ * a queue callback. Makes sure that we don't create a bio that spans across
+ * multiple osd objects. One exception would be with a single page bios,
+ * which we handle later at bio_chain_clone
+ */
+static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
+ struct bio_vec *bvec)
+{
+ sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
+ unsigned int chunk_sectors = (RBD_STRIPE_UNIT >> 9);
+ unsigned int bio_sectors = bmd->bi_size >> 9;
+ int max;
+
+ max = (chunk_sectors - ((sector & (chunk_sectors - 1))
+ + bio_sectors)) << 9;
+ if (max < 0)
+ max = 0; /* bio_add cannot handle a negative return */
+ if (max <= bvec->bv_len && bio_sectors == 0)
+ return bvec->bv_len;
+ return max;
+}
+
+static void rbd_free_disk(struct rbd_device *rbd_dev)
+{
+ struct gendisk *disk = rbd_dev->disk;
+
+ if (!disk)
+ return;
+
+ rbd_header_free(&rbd_dev->header);
+
+ if (disk->flags & GENHD_FL_UP)
+ del_gendisk(disk);
+ if (disk->queue)
+ blk_cleanup_queue(disk->queue);
+ put_disk(disk);
+}
+
+static char *rbd_alloc_md_name(struct rbd_device *rbd_dev, gfp_t gfp_flags)
+{
+ char *obj_md_name = kmalloc(strlen(rbd_dev->obj) + sizeof(RBD_SUFFIX),
+ gfp_flags);
+ if (!obj_md_name)
+ return NULL;
+ sprintf(obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
+
+ return obj_md_name;
+}
+
+static int rbd_read_header(struct rbd_device *rbd_dev,
+ struct rbd_obj_header *header)
+{
+ ssize_t rc;
+ char *obj_md_name;
+ struct rbd_obj_header_ondisk *dh;
+ int snap_count = 0;
+ u64 snap_names_len = 0;
+
+ obj_md_name = rbd_alloc_md_name(rbd_dev, GFP_KERNEL);
+ if (!obj_md_name)
+ return -ENOMEM;
+
+ while (1) {
+ int len = sizeof(*dh) +
+ snap_count * sizeof(struct rbd_obj_snap_ondisk) +
+ snap_names_len;
+
+ rc = -ENOMEM;
+ dh = kmalloc(len, GFP_KERNEL);
+ if (!dh)
+ goto out_obj_md;
+
+ rc = rbd_req_sync_read(rbd_dev,
+ NULL, CEPH_NOSNAP,
+ obj_md_name,
+ 0, len,
+ (char *)dh);
+ if (rc < 0)
+ goto out_dh;
+
+ rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
+ if (rc < 0)
+ goto out_dh;
+
+ if (snap_count != header->total_snaps) {
+ snap_count = header->total_snaps;
+ snap_names_len = header->snap_names_len;
+ rbd_header_free(header);
+ kfree(dh);
+ continue;
+ }
+ break;
+ }
+
+out_dh:
+ kfree(dh);
+out_obj_md:
+ kfree(obj_md_name);
+ return rc;
+}
+
+/*
+ * only read the first part of the ondisk header, without the snaps info
+ */
+static int rbd_read_ondisk_header_nosnap(struct rbd_device *rbd_dev,
+ struct rbd_obj_header *header,
+ struct rbd_obj_header_ondisk *dh)
+{
+ ssize_t rc;
+ char *obj_md_name;
+ int len;
+
+ obj_md_name = rbd_alloc_md_name(rbd_dev, GFP_KERNEL);
+ if (!obj_md_name)
+ return -ENOMEM;
+
+ len = sizeof(struct rbd_obj_header_ondisk);
+
+ rc = rbd_req_sync_read(rbd_dev,
+ NULL, CEPH_NOSNAP,
+ obj_md_name,
+ 0, len,
+ (char *)dh);
+ if (rc > 0 && rc < len)
+ rc = -EIO;
+
+ kfree(obj_md_name);
+ return rc;
+}
+
+static int rbd_write_header(struct rbd_device *rbd_dev,
+ struct rbd_obj_header_ondisk *dh,
+ struct rbd_obj_header *header)
+{
+ ssize_t rc;
+ char *obj_md_name;
+ int snap_count = header->total_snaps;
+ u64 snap_names_len = header->snap_names_len;
+ int len;
+
+ obj_md_name = rbd_alloc_md_name(rbd_dev, GFP_KERNEL);
+ if (!obj_md_name)
+ return -ENOMEM;
+
+ len = sizeof(*dh) +
+ snap_count * sizeof(struct rbd_obj_snap_ondisk) +
+ snap_names_len;
+
+ rc = rbd_req_sync_write(rbd_dev,
+ NULL, CEPH_NOSNAP,
+ obj_md_name,
+ 0, len,
+ (char *)dh);
+ kfree(obj_md_name);
+ return rc;
+}
+
+static int rbd_update_snaps(struct rbd_device *rbd_dev)
+{
+ int ret;
+ struct rbd_obj_header h;
+
+ ret = rbd_read_header(rbd_dev, &h);
+ if (ret < 0)
+ return ret;
+
+ down_write(&rbd_dev->header.snap_rwsem);
+
+ kfree(rbd_dev->header.snapc);
+ kfree(rbd_dev->header.snap_names);
+ kfree(rbd_dev->header.snap_sizes);
+
+ rbd_dev->header.total_snaps = h.total_snaps;
+ rbd_dev->header.snapc = h.snapc;
+ rbd_dev->header.snap_names = h.snap_names;
+ rbd_dev->header.snap_sizes = h.snap_sizes;
+
+ up_write(&rbd_dev->header.snap_rwsem);
+
+ return 0;
+}
+
+static int rbd_init_disk(struct rbd_device *rbd_dev)
+{
+ struct gendisk *disk;
+ struct request_queue *q;
+ int rc;
+ u64 total_size;
+ const char *snap = NULL;
+
+ /* contact OSD, request size info about the object being mapped */
+ rc = rbd_read_header(rbd_dev, &rbd_dev->header);
+ if (rc)
+ return rc;
+
+ if (rbd_dev->client->mount_args)
+ snap = rbd_dev->client->mount_args->snap;
+ rc = rbd_header_set_snap(rbd_dev, snap, &total_size);
+ if (rc)
+ return rc;
+
+ /* create gendisk info */
+ rc = -ENOMEM;
+ disk = alloc_disk(RBD_MINORS_PER_MAJOR);
+ if (!disk)
+ goto out;
+
+ sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
+ disk->major = rbd_dev->major;
+ disk->first_minor = 0;
+ disk->fops = &rbd_bd_ops;
+ disk->private_data = rbd_dev;
+
+ /* init rq */
+ rc = -ENOMEM;
+ q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
+ if (!q)
+ goto out_disk;
+ blk_queue_merge_bvec(q, rbd_merge_bvec);
+ disk->queue = q;
+
+ q->queuedata = rbd_dev;
+
+ rbd_dev->disk = disk;
+ rbd_dev->q = q;
+
+ /* finally, announce the disk to the world */
+ set_capacity(disk, total_size / 512ULL);
+ add_disk(disk);
+
+ pr_info("%s: added with size 0x%llx\n",
+ disk->disk_name, (unsigned long long)total_size);
+ return 0;
+
+out_disk:
+ put_disk(disk);
+out:
+ return rc;
+}
+
+/********************************************************************
+ * /sys/class/rbd/
+ * add map rados objects to blkdev
+ * remove unmap rados objects
+ * list show mappings
+ *******************************************************************/
+
+static void class_rbd_release(struct class *cls)
+{
+ kfree(cls);
+}
+
+static ssize_t class_rbd_list(struct class *c,
+ struct class_attribute *attr,
+ char *data)
+{
+ int n = 0;
+ struct list_head *tmp;
+
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ list_for_each(tmp, &rbddev_list) {
+ struct rbd_device *rbd_dev;
+
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ n += sprintf(data+n, "%d %d client%lld %s %s\n",
+ rbd_dev->id,
+ rbd_dev->major,
+ ceph_client_id(rbd_dev->client),
+ rbd_dev->pool_name,
+ rbd_dev->obj);
+ }
+
+ mutex_unlock(&ctl_mutex);
+ return n;
+}
+
+static ssize_t class_rbd_add(struct class *c,
+ struct class_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct rbd_device *rbd_dev;
+ ssize_t rc = -ENOMEM;
+ int irc, new_id = 0;
+ struct list_head *tmp;
+ char *mon_dev_name;
+ char *opt;
+
+ if (!try_module_get(THIS_MODULE))
+ return -ENODEV;
+
+ mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+ if (!mon_dev_name)
+ goto err_out_mod;
+
+ opt = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+ if (!opt)
+ goto err_mon_dev;
+
+ /* new rbd_device object */
+ rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
+ if (!rbd_dev)
+ goto err_out_opt;
+
+ /* static rbd_device initialization */
+ spin_lock_init(&rbd_dev->lock);
+ INIT_LIST_HEAD(&rbd_dev->node);
+
+ /* generate unique id: find highest unique id, add one */
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ list_for_each(tmp, &rbddev_list) {
+ struct rbd_device *rbd_dev;
+
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ if (rbd_dev->id >= new_id)
+ new_id = rbd_dev->id + 1;
+ }
+
+ rbd_dev->id = new_id;
+
+ /* add to global list */
+ list_add_tail(&rbd_dev->node, &rbddev_list);
+
+ /* parse add command */
+ if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
+ "%" __stringify(RBD_MAX_OPT_LEN) "s "
+ "%" __stringify(RBD_MAX_POOL_NAME_SIZE) "s "
+ "%" __stringify(RBD_MAX_OBJ_NAME_SIZE) "s",
+ mon_dev_name, opt, rbd_dev->pool_name,
+ rbd_dev->obj) != 4) {
+ rc = -EINVAL;
+ goto err_out_slot;
+ }
+
+ rbd_dev->obj_len = strlen(rbd_dev->obj);
+
+ /* initialize rest of new object */
+ snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
+ rc = rbd_get_client(rbd_dev, mon_dev_name, opt);
+ if (rc < 0)
+ goto err_out_slot;
+
+ mutex_unlock(&ctl_mutex);
+ /* register our block device */
+ irc = register_blkdev(0, rbd_dev->name);
+ if (irc < 0) {
+ rc = irc;
+ goto err_out_client;
+ }
+ rbd_dev->major = irc;
+
+ /* set up and announce blkdev mapping */
+ rc = rbd_init_disk(rbd_dev);
+ if (rc)
+ goto err_out_blkdev;
+
+ return count;
+
+err_out_blkdev:
+ unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_client:
+ rbd_put_client(rbd_dev);
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+err_out_slot:
+ list_del_init(&rbd_dev->node);
+ mutex_unlock(&ctl_mutex);
+
+ kfree(rbd_dev);
+err_out_opt:
+ kfree(opt);
+err_mon_dev:
+ kfree(mon_dev_name);
+err_out_mod:
+ dout("Error adding device %s\n", buf);
+ module_put(THIS_MODULE);
+ return rc;
+}
+
+static struct rbd_device *__rbd_get_dev(unsigned long id)
+{
+ struct list_head *tmp;
+ struct rbd_device *rbd_dev = NULL;
+
+ list_for_each(tmp, &rbddev_list) {
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ if (rbd_dev->id == id)
+ break;
+
+ rbd_dev = NULL;
+ }
+
+ return rbd_dev;
+}
+
+static ssize_t class_rbd_remove(struct class *c,
+ struct class_attribute *attr,
+ const char *buf,
+ size_t count)
+{
+ struct rbd_device *rbd_dev = NULL;
+ int target_id, rc;
+ unsigned long ul;
+
+ rc = strict_strtoul(buf, 10, &ul);
+ if (rc)
+ return rc;
+
+ /* convert to int; abort if we lost anything in the conversion */
+ target_id = (int) ul;
+ if (target_id != ul)
+ return -EINVAL;
+
+ /* remove object from list immediately */
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ rbd_dev = __rbd_get_dev(target_id);
+ if (rbd_dev)
+ list_del_init(&rbd_dev->node);
+
+ mutex_unlock(&ctl_mutex);
+
+ if (!rbd_dev)
+ return -ENOENT;
+
+ rbd_put_client(rbd_dev);
+
+ /* clean up and free blkdev and associated OSD connection */
+ rbd_free_disk(rbd_dev);
+ unregister_blkdev(rbd_dev->major, rbd_dev->name);
+ kfree(rbd_dev);
+
+ /* release module ref */
+ module_put(THIS_MODULE);
+
+ return count;
+}
+
+static void get_size_and_suffix(u64 orig_size, u64 *size, char *suffix)
+{
+ if (orig_size >= 1024*1024*1024) {
+ *size = orig_size / (1024*1024*1024);
+ *suffix = 'G';
+ } else if (orig_size >= 1024*1024) {
+ *size = orig_size / (1024*1024);
+ *suffix = 'M';
+ } else if (orig_size >= 1024) {
+ *size = orig_size / 1024;
+ *suffix = 'K';
+ } else {
+ *size = orig_size;
+ *suffix = ' ';
+ }
+}
+
+static ssize_t class_rbd_snaps_list(struct class *c,
+ struct class_attribute *attr,
+ char *data)
+{
+ struct rbd_device *rbd_dev = NULL;
+ struct list_head *tmp;
+ struct rbd_obj_header *header;
+ char size_suffix;
+ u64 size;
+ int i, n = 0, max = PAGE_SIZE;
+ int ret;
+
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ list_for_each(tmp, &rbddev_list) {
+ char *names, *p;
+ struct ceph_snap_context *snapc;
+
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ header = &rbd_dev->header;
+ names = header->snap_names;
+ snapc = header->snapc;
+ n += snprintf(data + n, max - n,
+ "snapshots for device id %d:\n",
+ rbd_dev->id);
+ if (n == max)
+ break;
+
+ down_read(&header->snap_rwsem);
+
+ get_size_and_suffix(header->image_size, &size,
+ &size_suffix);
+ n += snprintf(data + n, max - n, "%s\t%lld%c%s\n",
+ RBD_SNAP_HEAD_NAME,
+ size, size_suffix,
+ (!rbd_dev->cur_snap ?
+ " (*)" : ""));
+ if (n == max)
+ break;
+
+ p = names;
+ for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
+ get_size_and_suffix(header->snap_sizes[i], &size,
+ &size_suffix);
+ n += snprintf(data + n, max - n, "%s\t%lld%c%s\n",
+ p, size, size_suffix,
+ (rbd_dev->cur_snap &&
+ (snap_index(header, i) == rbd_dev->cur_snap) ?
+ " (*)" : ""));
+ if (n == max)
+ break;
+ }
+
+ up_read(&header->snap_rwsem);
+ }
+
+
+ ret = n;
+ mutex_unlock(&ctl_mutex);
+ return ret;
+}
+
+static ssize_t class_rbd_snaps_refresh(struct class *c,
+ struct class_attribute *attr,
+ const char *buf,
+ size_t count)
+{
+ struct rbd_device *rbd_dev = NULL;
+ int target_id, rc;
+ unsigned long ul;
+ int ret = count;
+
+ rc = strict_strtoul(buf, 10, &ul);
+ if (rc)
+ return rc;
+
+ /* convert to int; abort if we lost anything in the conversion */
+ target_id = (int) ul;
+ if (target_id != ul)
+ return -EINVAL;
+
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ rbd_dev = __rbd_get_dev(target_id);
+ if (!rbd_dev) {
+ ret = -ENOENT;
+ goto done;
+ }
+
+ rc = rbd_update_snaps(rbd_dev);
+ if (rc < 0)
+ ret = rc;
+
+done:
+ mutex_unlock(&ctl_mutex);
+ return ret;
+}
+
+static ssize_t class_rbd_snaps_op(struct class *c,
+ struct class_attribute *attr,
+ const char *buf,
+ size_t count,
+ int snaps_op)
+{
+ struct rbd_device *rbd_dev = NULL;
+ int target_id, ret;
+ char *name;
+ struct rbd_obj_header_ondisk old_ondisk, *new_ondisk;
+
+ name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
+ if (!name)
+ return -ENOMEM;
+
+ /* parse snaps add command */
+ if (sscanf(buf, "%d "
+ "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
+ &target_id,
+ name) != 2) {
+ ret = -EINVAL;
+ goto done;
+ }
+
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+ rbd_dev = __rbd_get_dev(target_id);
+ if (!rbd_dev) {
+ ret = -ENOENT;
+ goto done_unlock;
+ }
+
+ ret = rbd_read_ondisk_header_nosnap(rbd_dev,
+ &rbd_dev->header,
+ &old_ondisk);
+ if (ret < 0)
+ goto done_unlock;
+
+ switch (snaps_op) {
+ case RBD_SNAP_OP_CREATE:
+ ret = rbd_header_add_snap(rbd_dev,
+ name, GFP_KERNEL);
+ break;
+ case RBD_SNAP_OP_SET:
+ ret = rbd_header_set_snap(rbd_dev, name, NULL);
+ break;
+ default:
+ ret = -EINVAL;
+ }
+ if (ret < 0)
+ goto done_unlock;
+
+ ret = rbd_header_to_disk(&new_ondisk, &old_ondisk,
+ &rbd_dev->header, GFP_KERNEL);
+ if (ret < 0)
+ goto done_unlock;
+
+ ret = rbd_write_header(rbd_dev, new_ondisk, &rbd_dev->header);
+ if (ret < 0)
+ goto done_unlock;
+
+ ret = count;
+done_unlock:
+ mutex_unlock(&ctl_mutex);
+done:
+ kfree(name);
+ return ret;
+}
+
+static ssize_t class_rbd_snap_create(struct class *c,
+ struct class_attribute *attr,
+ const char *buf,
+ size_t count)
+{
+ return class_rbd_snaps_op(c, attr, buf, count,
+ RBD_SNAP_OP_CREATE);
+}
+
+static struct class_attribute class_rbd_attrs[] = {
+ __ATTR(add, 0200, NULL, class_rbd_add),
+ __ATTR(remove, 0200, NULL, class_rbd_remove),
+ __ATTR(list, 0444, class_rbd_list, NULL),
+ __ATTR(snaps_refresh, 0200, NULL, class_rbd_snaps_refresh),
+ __ATTR(snap_create, 0200, NULL, class_rbd_snap_create),
+ __ATTR(snaps_list, 0444, class_rbd_snaps_list, NULL),
+ __ATTR_NULL
+};
+
+/*
+ * create control files in sysfs
+ * /sys/class/rbd/...
+ */
+static int rbd_sysfs_init(void)
+{
+ int ret = -ENOMEM;
+
+ class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
+ if (!class_rbd)
+ goto out;
+
+ class_rbd->name = DRV_NAME;
+ class_rbd->owner = THIS_MODULE;
+ class_rbd->class_release = class_rbd_release;
+ class_rbd->class_attrs = class_rbd_attrs;
+
+ ret = class_register(class_rbd);
+ if (ret)
+ goto out_class;
+ return 0;
+
+out_class:
+ kfree(class_rbd);
+ class_rbd = NULL;
+ pr_err(DRV_NAME ": failed to create class rbd\n");
+out:
+ return ret;
+}
+
+static void rbd_sysfs_cleanup(void)
+{
+ if (class_rbd)
+ class_destroy(class_rbd);
+ class_rbd = NULL;
+}
+
+int __init rbd_init(void)
+{
+ int rc;
+
+ rc = rbd_sysfs_init();
+ if (rc)
+ return rc;
+ spin_lock_init(&node_lock);
+ pr_info("loaded " DRV_NAME_LONG);
+ return 0;
+}
+
+void __exit rbd_exit(void)
+{
+ rbd_sysfs_cleanup();
+}
diff --git a/fs/ceph/rbd.h b/fs/ceph/rbd.h
new file mode 100644
index 000000000000..68e0a5c69dc8
--- /dev/null
+++ b/fs/ceph/rbd.h
@@ -0,0 +1,8 @@
+#ifndef _FS_CEPH_RBD
+#define _FS_CEPH_RBD
+
+extern void rbd_set_osdc(struct ceph_osd_client *o);
+extern int __init rbd_init(void);
+extern void __exit rbd_exit(void);
+
+#endif
diff --git a/fs/ceph/rbd_types.h b/fs/ceph/rbd_types.h
new file mode 100644
index 000000000000..b73ac12ad0d6
--- /dev/null
+++ b/fs/ceph/rbd_types.h
@@ -0,0 +1,48 @@
+#ifndef _FS_CEPH_RBD
+#define _FS_CEPH_RBD
+
+#include <linux/types.h>
+
+/*
+ * rbd image 'foo' consists of objects
+ * foo.rbd - image metadata
+ * foo.00000000
+ * foo.00000001
+ * ... - data
+ */
+
+#define RBD_SUFFIX ".rbd"
+#define RBD_DIRECTORY "rbd_directory"
+
+#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */
+
+#define RBD_MAX_OBJ_NAME_SIZE 96
+#define RBD_MAX_SEG_NAME_SIZE 128
+
+#define RBD_COMP_NONE 0
+#define RBD_CRYPT_NONE 0
+
+static const char rbd_text[] = "<<< Rados Block Device Image >>>\n";
+static const char rbd_signature[] = "RBD";
+static const char rbd_version[] = "001.002";
+
+struct rbd_obj_snap_ondisk {
+ __le64 id;
+ __le64 image_size;
+} __attribute__((packed));
+
+struct rbd_obj_header_ondisk {
+ char text[64];
+ char signature[4];
+ char version[8];
+ __le64 image_size;
+ __u8 obj_order;
+ __u8 crypt_type;
+ __u8 comp_type;
+ __le64 snap_seq;
+ __le32 snap_count;
+ __le64 snap_names_len;
+ struct rbd_obj_snap_ondisk snaps[0];
+} __attribute__((packed));
+
+#endif
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 4e0bee240b9d..c14d2623c9b5 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -18,6 +18,7 @@
#include "super.h"
#include "mon_client.h"
#include "auth.h"
+#include "rbd.h"
/*
* Ceph superblock operations
@@ -342,6 +343,7 @@ enum {
Opt_snapdirname,
Opt_name,
Opt_secret,
+ Opt_snap,
Opt_last_string,
/* string args above */
Opt_ip,
@@ -374,6 +376,7 @@ static match_table_t arg_tokens = {
{Opt_snapdirname, "snapdirname=%s"},
{Opt_name, "name=%s"},
{Opt_secret, "secret=%s"},
+ {Opt_snap, "snap=%s"},
/* string args above */
{Opt_ip, "ip=%s"},
{Opt_noshare, "noshare"},
@@ -387,14 +390,15 @@ static match_table_t arg_tokens = {
};
-static struct ceph_mount_args *parse_mount_args(int flags, char *options,
- const char *dev_name,
- const char **path)
+struct ceph_mount_args *parse_mount_args(int flags, char *options,
+ const char *dev_name,
+ const char **path)
{
struct ceph_mount_args *args;
const char *c;
int err = -ENOMEM;
substring_t argstr[MAX_OPT_ARGS];
+ const char *end_path;
args = kzalloc(sizeof(*args), GFP_KERNEL);
if (!args)
@@ -426,23 +430,29 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
err = -EINVAL;
if (!dev_name)
goto out;
- *path = strstr(dev_name, ":/");
- if (*path == NULL) {
- pr_err("device name is missing path (no :/ in %s)\n",
- dev_name);
- goto out;
+
+ if (path) {
+ *path = strstr(dev_name, ":/");
+ if (*path == NULL) {
+ pr_err("device name is missing path (no :/ in %s)\n",
+ dev_name);
+ goto out;
+ }
+ end_path = *path;
+
+ /* path on server */
+ *path += 2;
+ dout("server path '%s'\n", *path);
+ } else {
+ end_path = dev_name + strlen(dev_name);
}
/* get mon ip(s) */
- err = ceph_parse_ips(dev_name, *path, args->mon_addr,
+ err = ceph_parse_ips(dev_name, end_path, args->mon_addr,
CEPH_MAX_MON, &args->num_mon);
if (err < 0)
goto out;
- /* path on server */
- *path += 2;
- dout("server path '%s'\n", *path);
-
/* parse mount options */
while ((c = strsep(&options, ",")) != NULL) {
int token, intval, ret;
@@ -501,6 +511,11 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
argstr[0].to-argstr[0].from,
GFP_KERNEL);
break;
+ case Opt_snap:
+ args->snap = kstrndup(argstr[0].from,
+ argstr[0].to-argstr[0].from,
+ GFP_KERNEL);
+ break;
/* misc */
case Opt_wsize:
@@ -569,22 +584,70 @@ out:
return ERR_PTR(err);
}
-static void destroy_mount_args(struct ceph_mount_args *args)
+void ceph_destroy_mount_args(struct ceph_mount_args *args)
{
dout("destroy_mount_args %p\n", args);
kfree(args->snapdir_name);
- args->snapdir_name = NULL;
kfree(args->name);
- args->name = NULL;
kfree(args->secret);
- args->secret = NULL;
+ kfree(args->snap);
kfree(args);
}
+static int strcmp_null(const char *s1, const char *s2)
+{
+ if (!s1 && !s2)
+ return 0;
+ if (s1 && !s2)
+ return -1;
+ if (!s1 && s2)
+ return 1;
+ return strcmp(s1, s2);
+}
+
+int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+ struct ceph_client *client)
+{
+ struct ceph_mount_args *args1 = new_args;
+ struct ceph_mount_args *args2 = client->mount_args;
+ int ofs = offsetof(struct ceph_mount_args, mon_addr);
+ int i;
+ int ret;
+
+ ret = memcmp(args1, args2, ofs);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(args1->snapdir_name, args2->snapdir_name);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(args1->name, args2->name);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(args1->secret, args2->secret);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(args1->snap, args2->snap);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < args1->num_mon; i++) {
+ if (ceph_monmap_contains(client->monc.monmap,
+ &args1->mon_addr[i]))
+ return 0;
+ }
+
+ return -1;
+}
+
/*
* create a fresh client instance
*/
-static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
+struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+ int need_mdsc)
{
struct ceph_client *client;
int err = -ENOMEM;
@@ -639,9 +702,13 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
err = ceph_osdc_init(&client->osdc, client);
if (err < 0)
goto fail_monc;
- err = ceph_mdsc_init(&client->mdsc, client);
- if (err < 0)
- goto fail_osdc;
+ if (need_mdsc) {
+ err = ceph_mdsc_init(&client->mdsc, client);
+ if (err < 0)
+ goto fail_osdc;
+ client->have_mdsc = 1;
+ }
+
return client;
fail_osdc:
@@ -663,7 +730,12 @@ fail:
return ERR_PTR(err);
}
-static void ceph_destroy_client(struct ceph_client *client)
+u64 ceph_client_id(struct ceph_client *client)
+{
+ return client->monc.auth->global_id;
+}
+
+void ceph_destroy_client(struct ceph_client *client)
{
dout("destroy_client %p\n", client);
@@ -693,7 +765,7 @@ static void ceph_destroy_client(struct ceph_client *client)
ceph_messenger_destroy(client->msgr);
mempool_destroy(client->wb_pagevec_pool);
- destroy_mount_args(client->mount_args);
+ ceph_destroy_mount_args(client->mount_args);
kfree(client);
dout("destroy_client %p done\n", client);
@@ -712,7 +784,7 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
}
} else {
pr_info("client%lld fsid " FSID_FORMAT "\n",
- client->monc.auth->global_id, PR_FSID(fsid));
+ ceph_client_id(client), PR_FSID(fsid));
memcpy(&client->fsid, fsid, sizeof(*fsid));
ceph_debugfs_client_init(client);
client->have_fsid = true;
@@ -774,17 +846,12 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
/*
* mount: join the ceph cluster, and open root directory.
*/
-static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
- const char *path)
+static int __ceph_open_session(struct ceph_client *client,
+ unsigned long started)
{
struct ceph_entity_addr *myaddr = NULL;
int err;
unsigned long timeout = client->mount_args->mount_timeout * HZ;
- unsigned long started = jiffies; /* note the start time */
- struct dentry *root;
-
- dout("mount start\n");
- mutex_lock(&client->mount_mutex);
/* initialize the messenger */
if (client->msgr == NULL) {
@@ -792,9 +859,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
myaddr = &client->mount_args->my_addr;
client->msgr = ceph_messenger_create(myaddr);
if (IS_ERR(client->msgr)) {
- err = PTR_ERR(client->msgr);
client->msgr = NULL;
- goto out;
+ return PTR_ERR(client->msgr);
}
client->msgr->nocrc = ceph_test_opt(client, NOCRC);
}
@@ -802,26 +868,58 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
/* open session, and wait for mon, mds, and osd maps */
err = ceph_monc_open_session(&client->monc);
if (err < 0)
- goto out;
+ return err;
while (!have_mon_and_osd_map(client)) {
err = -EIO;
if (timeout && time_after_eq(jiffies, started + timeout))
- goto out;
+ return err;
/* wait */
dout("mount waiting for mon_map\n");
err = wait_event_interruptible_timeout(client->auth_wq,
- have_mon_and_osd_map(client) || (client->auth_err < 0),
- timeout);
+ have_mon_and_osd_map(client) || (client->auth_err < 0),
+ timeout);
if (err == -EINTR || err == -ERESTARTSYS)
- goto out;
- if (client->auth_err < 0) {
- err = client->auth_err;
- goto out;
- }
+ return err;
+ if (client->auth_err < 0)
+ return client->auth_err;
}
+ return 0;
+}
+
+int ceph_open_session(struct ceph_client *client)
+{
+ int ret;
+ unsigned long started = jiffies; /* note the start time */
+
+ dout("open_session start\n");
+ mutex_lock(&client->mount_mutex);
+
+ ret = __ceph_open_session(client, started);
+
+ mutex_unlock(&client->mount_mutex);
+ return ret;
+}
+
+/*
+ * mount: join the ceph cluster, and open root directory.
+ */
+static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
+ const char *path)
+{
+ int err;
+ unsigned long started = jiffies; /* note the start time */
+ struct dentry *root;
+
+ dout("mount start\n");
+ mutex_lock(&client->mount_mutex);
+
+ err = __ceph_open_session(client, started);
+ if (err < 0)
+ goto out;
+
dout("mount opening root\n");
root = open_root_dentry(client, "", started);
if (IS_ERR(root)) {
@@ -963,7 +1061,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
}
/* create client (which we may/may not use) */
- client = ceph_create_client(args);
+ client = ceph_create_client(args, 1);
if (IS_ERR(client)) {
err = PTR_ERR(client);
goto out_final;
@@ -1053,8 +1151,14 @@ static int __init init_ceph(void)
CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL,
CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT,
CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT);
+
+ ret = rbd_init();
+ if (ret)
+ goto out_fs;
return 0;
+out_fs:
+ unregister_filesystem(&ceph_fs_type);
out_icache:
destroy_caches();
out_msgr:
@@ -1068,6 +1172,7 @@ out:
static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
+ rbd_exit();
unregister_filesystem(&ceph_fs_type);
ceph_caps_finalize();
destroy_caches();
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 10a4a406e887..b46de1f40ec9 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -48,14 +48,11 @@
#define ceph_test_opt(client, opt) \
(!!((client)->mount_args->flags & CEPH_OPT_##opt))
-
struct ceph_mount_args {
int sb_flags;
int flags;
struct ceph_fsid fsid;
struct ceph_entity_addr my_addr;
- int num_mon;
- struct ceph_entity_addr *mon_addr;
int mount_timeout;
int osd_idle_ttl;
int osd_timeout;
@@ -67,9 +64,17 @@ struct ceph_mount_args {
int cap_release_safety;
int max_readdir; /* max readdir result (entires) */
int max_readdir_bytes; /* max readdir result (bytes) */
+
+ /* any type that can't be simply compared or doesn't need
+ need to be compared should go beyond this point,
+ ceph_compare_mount_args() should be updated accordingly */
+ struct ceph_entity_addr *mon_addr; /* should be the first
+ pointer type of args */
+ int num_mon;
char *snapdir_name; /* default ".snap" */
char *name;
char *secret;
+ char *snap; /* rbd snapshot */
};
/*
@@ -139,6 +144,8 @@ struct ceph_client {
int min_caps; /* min caps i added */
+ int have_mdsc;
+
struct ceph_messenger *msgr; /* messenger instance */
struct ceph_mon_client monc;
struct ceph_mds_client mdsc;
@@ -737,6 +744,17 @@ extern struct kmem_cache *ceph_file_cachep;
extern const char *ceph_msg_type_name(int type);
extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
+extern struct ceph_mount_args *parse_mount_args(int flags, char *options,
+ const char *dev_name,
+ const char **path);
+extern void ceph_destroy_mount_args(struct ceph_mount_args *args);
+extern int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+ struct ceph_client *client);
+extern struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+ int need_mdsc);
+extern u64 ceph_client_id(struct ceph_client *client);
+extern void ceph_destroy_client(struct ceph_client *client);
+extern int ceph_open_session(struct ceph_client *client);
#define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \
"%02x%02x%02x%02x%02x%02x"
@@ -847,6 +865,13 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
/* file.c */
extern const struct file_operations ceph_file_fops;
extern const struct address_space_operations ceph_aops;
+extern int ceph_copy_to_page_vector(struct page **pages,
+ const char *data,
+ loff_t off, size_t len);
+extern int ceph_copy_from_page_vector(struct page **pages,
+ char *data,
+ loff_t off, size_t len);
+extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
extern int ceph_open(struct inode *inode, struct file *file);
extern struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
struct nameidata *nd, int mode,
@@ -896,4 +921,9 @@ static inline struct inode *get_dentry_parent_inode(struct dentry *dentry)
return NULL;
}
+#ifndef CONFIG_RBD
+static inline int __init rbd_init(void) { return 0; }
+static inline void __exit rbd_exit(void) {}
+#endif
+
#endif /* _FS_CEPH_SUPER_H */