diff options
Diffstat (limited to 'fs/ceph/file.c')
-rw-r--r-- | fs/ceph/file.c | 336 |
1 files changed, 177 insertions, 159 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 988d4f302e48..2ddf061c1c4a 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -4,8 +4,10 @@ #include <linux/sched.h> #include <linux/slab.h> #include <linux/file.h> +#include <linux/mount.h> #include <linux/namei.h> #include <linux/writeback.h> +#include <linux/aio.h> #include "super.h" #include "mds_client.h" @@ -106,9 +108,6 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) } /* - * If the filp already has private_data, that means the file was - * already opened by intent during lookup, and we do nothing. - * * If we already have the requisite capabilities, we can satisfy * the open request locally (no need to request new caps from the * MDS). We do, however, need to inform the MDS (asynchronously) @@ -207,36 +206,34 @@ out: /* - * Do a lookup + open with a single request. - * - * If this succeeds, but some subsequent check in the vfs - * may_open() fails, the struct *file gets cleaned up (i.e. - * ceph_release gets called). So fear not! - */ -/* - * flags - * path_lookup_open -> LOOKUP_OPEN - * path_lookup_create -> LOOKUP_OPEN|LOOKUP_CREATE + * Do a lookup + open with a single request. If we get a non-existent + * file or symlink, return 1 so the VFS can retry. */ -struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, - struct nameidata *nd, int mode, - int locked_dir) +int ceph_atomic_open(struct inode *dir, struct dentry *dentry, + struct file *file, unsigned flags, umode_t mode, + int *opened) { struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; - struct file *file; struct ceph_mds_request *req; - struct dentry *ret; + struct dentry *dn; int err; - int flags = nd->intent.open.flags; - dout("ceph_lookup_open dentry %p '%.*s' flags %d mode 0%o\n", - dentry, dentry->d_name.len, dentry->d_name.name, flags, mode); + dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n", + dir, dentry, dentry->d_name.len, dentry->d_name.name, + d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode); + + if (dentry->d_name.len > NAME_MAX) + return -ENAMETOOLONG; + + err = ceph_init_dentry(dentry); + if (err < 0) + return err; /* do the open */ req = prepare_open_request(dir->i_sb, flags, mode); if (IS_ERR(req)) - return ERR_CAST(req); + return PTR_ERR(req); req->r_dentry = dget(dentry); req->r_num_caps = 2; if (flags & O_CREAT) { @@ -247,21 +244,39 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); - err = ceph_handle_snapdir(req, dentry, err); if (err) - goto out; - if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry) + goto out_err; + + err = ceph_handle_snapdir(req, dentry, err); + if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); + + if (d_unhashed(dentry)) { + dn = ceph_finish_lookup(req, dentry, err); + if (IS_ERR(dn)) + err = PTR_ERR(dn); + } else { + /* we were given a hashed negative dentry */ + dn = NULL; + } if (err) - goto out; - file = lookup_instantiate_filp(nd, req->r_dentry, ceph_open); - if (IS_ERR(file)) - err = PTR_ERR(file); -out: - ret = ceph_finish_lookup(req, dentry, err); + goto out_err; + if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) { + /* make vfs retry on splice, ENOENT, or symlink */ + dout("atomic_open finish_no_open on dn %p\n", dn); + err = finish_no_open(file, dn); + } else { + dout("atomic_open finish_open on dn %p\n", dn); + if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { + *opened |= FILE_CREATED; + } + err = finish_open(file, dentry, ceph_open, opened); + } + +out_err: ceph_mdsc_put_request(req); - dout("ceph_lookup_open result=%p\n", ret); - return ret; + dout("atomic_open result=%d\n", err); + return err; } int ceph_release(struct inode *inode, struct file *file) @@ -385,7 +400,7 @@ more: static ssize_t ceph_sync_read(struct file *file, char __user *data, unsigned len, loff_t *poff, int *checkeof) { - struct inode *inode = file->f_dentry->d_inode; + struct inode *inode = file_inode(file); struct page **pages; u64 off = *poff; int num_pages, ret; @@ -432,19 +447,35 @@ done: } /* - * Write commit callback, called if we requested both an ACK and - * ONDISK commit reply from the OSD. + * Write commit request unsafe callback, called to tell us when a + * request is unsafe (that is, in flight--has been handed to the + * messenger to send to its target osd). It is called again when + * we've received a response message indicating the request is + * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request + * is completed early (and unsuccessfully) due to a timeout or + * interrupt. + * + * This is used if we requested both an ACK and ONDISK commit reply + * from the OSD. */ -static void sync_write_commit(struct ceph_osd_request *req, - struct ceph_msg *msg) +static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) { struct ceph_inode_info *ci = ceph_inode(req->r_inode); - dout("sync_write_commit %p tid %llu\n", req, req->r_tid); - spin_lock(&ci->i_unsafe_lock); - list_del_init(&req->r_unsafe_item); - spin_unlock(&ci->i_unsafe_lock); - ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); + dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid, + unsafe ? "un" : ""); + if (unsafe) { + ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_item, + &ci->i_unsafe_writes); + spin_unlock(&ci->i_unsafe_lock); + } else { + spin_lock(&ci->i_unsafe_lock); + list_del_init(&req->r_unsafe_item); + spin_unlock(&ci->i_unsafe_lock); + ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); + } } /* @@ -456,36 +487,33 @@ static void sync_write_commit(struct ceph_osd_request *req, * objects, rollback on failure, etc.) */ static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t left, loff_t *offset) + size_t left, loff_t pos, loff_t *ppos) { - struct inode *inode = file->f_dentry->d_inode; + struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; struct ceph_osd_request *req; + int num_ops = 1; struct page **pages; int num_pages; - long long unsigned pos; u64 len; int written = 0; int flags; - int do_sync = 0; int check_caps = 0; int page_align, io_align; unsigned long buf_align; int ret; struct timespec mtime = CURRENT_TIME; + bool own_pages = false; - if (ceph_snap(file->f_dentry->d_inode) != CEPH_NOSNAP) + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_write on file %p %lld~%u %s\n", file, *offset, + dout("sync_write on file %p %lld~%u %s\n", file, pos, (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); - if (file->f_flags & O_APPEND) - pos = i_size_read(inode); - else - pos = *offset; - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); if (ret < 0) return ret; @@ -502,7 +530,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) flags |= CEPH_OSD_FLAG_ACK; else - do_sync = 1; + num_ops++; /* Also include a 'startsync' command. */ /* * we may need to do multiple writes here if we span an object @@ -512,25 +540,20 @@ more: io_align = pos & ~PAGE_MASK; buf_align = (unsigned long)data & ~PAGE_MASK; len = left; - if (file->f_flags & O_DIRECT) { - /* write from beginning of first page, regardless of - io alignment */ - page_align = (pos - io_align + buf_align) & ~PAGE_MASK; - num_pages = calc_pages_for((unsigned long)data, len); - } else { - page_align = pos & ~PAGE_MASK; - num_pages = calc_pages_for(pos, len); - } + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - ceph_vino(inode), pos, &len, - CEPH_OSD_OP_WRITE, flags, - ci->i_snap_realm->cached_context, - do_sync, + vino, pos, &len, num_ops, + CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, - &mtime, false, 2, page_align); - if (!req) - return -ENOMEM; + false); + if (IS_ERR(req)) + return PTR_ERR(req); + /* write from beginning of first page, regardless of io alignment */ + page_align = file->f_flags & O_DIRECT ? buf_align : io_align; + num_pages = calc_pages_for(page_align, len); if (file->f_flags & O_DIRECT) { pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { @@ -558,36 +581,20 @@ more: if ((file->f_flags & O_SYNC) == 0) { /* get a second commit callback */ - req->r_safe_callback = sync_write_commit; - req->r_own_pages = 1; + req->r_unsafe_callback = ceph_sync_write_unsafe; + req->r_inode = inode; + own_pages = true; } } - req->r_pages = pages; - req->r_num_pages = num_pages; - req->r_inode = inode; + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, own_pages); + + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); - if (!ret) { - if (req->r_safe_callback) { - /* - * Add to inode unsafe list only after we - * start_request so that a tid has been assigned. - */ - spin_lock(&ci->i_unsafe_lock); - list_add_tail(&req->r_unsafe_item, - &ci->i_unsafe_writes); - spin_unlock(&ci->i_unsafe_lock); - ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); - } - + if (!ret) ret = ceph_osdc_wait_request(&fsc->client->osdc, req); - if (ret < 0 && req->r_safe_callback) { - spin_lock(&ci->i_unsafe_lock); - list_del_init(&req->r_unsafe_item); - spin_unlock(&ci->i_unsafe_lock); - ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); - } - } if (file->f_flags & O_DIRECT) ceph_put_page_vector(pages, num_pages, false); @@ -600,12 +607,12 @@ out: pos += len; written += len; left -= len; - data += written; + data += len; if (left) goto more; ret = written; - *offset = pos; + *ppos = pos; if (pos > i_size_read(inode)) check_caps = ceph_inode_set_size(inode, pos); if (check_caps) @@ -629,7 +636,7 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, struct ceph_file_info *fi = filp->private_data; loff_t *ppos = &iocb->ki_pos; size_t len = iov->iov_len; - struct inode *inode = filp->f_dentry->d_inode; + struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); void __user *base = iov->iov_base; ssize_t ret; @@ -639,7 +646,6 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", inode, ceph_vinop(inode), pos, (unsigned)len, inode); again: - __ceph_do_pending_vmtruncate(inode); if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else @@ -699,69 +705,78 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, { struct file *file = iocb->ki_filp; struct ceph_file_info *fi = file->private_data; - struct inode *inode = file->f_dentry->d_inode; + struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; - loff_t endoff = pos + iov->iov_len; - int want, got = 0; - int ret, err; + ssize_t count, written = 0; + int err, want, got; + bool hold_mutex; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; + mutex_lock(&inode->i_mutex); + hold_mutex = true; + + err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ); + if (err) + goto out; + + /* We can write back this queue in page reclaim */ + current->backing_dev_info = file->f_mapping->backing_dev_info; + + err = generic_write_checks(file, &pos, &count, S_ISBLK(inode->i_mode)); + if (err) + goto out; + + if (count == 0) + goto out; + + err = file_remove_suid(file); + if (err) + goto out; + + err = file_update_time(file); + if (err) + goto out; + retry_snap: - if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) - return -ENOSPC; - __ceph_do_pending_vmtruncate(inode); - dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n", - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, - inode->i_size); + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) { + err = -ENOSPC; + goto out; + } + + dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", + inode, ceph_vinop(inode), pos, count, inode->i_size); if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; else want = CEPH_CAP_FILE_BUFFER; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); - if (ret < 0) - goto out_put; + got = 0; + err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count); + if (err < 0) + goto out; - dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n", - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, - ceph_cap_string(got)); + dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n", + inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || (inode->i_sb->s_flags & MS_SYNCHRONOUS) || (fi->flags & CEPH_F_SYNC)) { - ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, - &iocb->ki_pos); + mutex_unlock(&inode->i_mutex); + written = ceph_sync_write(file, iov->iov_base, count, + pos, &iocb->ki_pos); } else { - /* - * buffered write; drop Fw early to avoid slow - * revocation if we get stuck on balance_dirty_pages - */ - int dirty; - - spin_lock(&ci->i_ceph_lock); - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); - spin_unlock(&ci->i_ceph_lock); - ceph_put_cap_refs(ci, got); - - ret = generic_file_aio_write(iocb, iov, nr_segs, pos); - if ((ret >= 0 || ret == -EIOCBQUEUED) && - ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) - || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { - err = vfs_fsync_range(file, pos, pos + ret - 1, 1); - if (err < 0) - ret = err; - } - - if (dirty) - __mark_inode_dirty(inode, dirty); - goto out; + written = generic_file_buffered_write(iocb, iov, nr_segs, + pos, &iocb->ki_pos, + count, 0); + mutex_unlock(&inode->i_mutex); } + hold_mutex = false; - if (ret >= 0) { + if (written >= 0) { int dirty; spin_lock(&ci->i_ceph_lock); dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); @@ -770,26 +785,38 @@ retry_snap: __mark_inode_dirty(inode, dirty); } -out_put: dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, ceph_cap_string(got)); ceph_put_cap_refs(ci, got); -out: - if (ret == -EOLDSNAPC) { + if (written >= 0 && + ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { + err = vfs_fsync_range(file, pos, pos + written - 1, 1); + if (err < 0) + written = err; + } + + if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len); + mutex_lock(&inode->i_mutex); + hold_mutex = true; goto retry_snap; } +out: + if (hold_mutex) + mutex_unlock(&inode->i_mutex); + current->backing_dev_info = NULL; - return ret; + return written ? written : err; } /* * llseek. be sure to verify file size on SEEK_END. */ -static loff_t ceph_llseek(struct file *file, loff_t offset, int origin) +static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) { struct inode *inode = file->f_mapping->host; int ret; @@ -797,7 +824,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int origin) mutex_lock(&inode->i_mutex); __ceph_do_pending_vmtruncate(inode); - if (origin == SEEK_END || origin == SEEK_DATA || origin == SEEK_HOLE) { + if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); if (ret < 0) { offset = ret; @@ -805,7 +832,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int origin) } } - switch (origin) { + switch (whence) { case SEEK_END: offset += inode->i_size; break; @@ -837,16 +864,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int origin) break; } - if (offset < 0 || offset > inode->i_sb->s_maxbytes) { - offset = -EINVAL; - goto out; - } - - /* Special lock needed here? */ - if (offset != file->f_pos) { - file->f_pos = offset; - file->f_version = 0; - } + offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes); out: mutex_unlock(&inode->i_mutex); |