diff options
Diffstat (limited to 'fs/ceph/file.c')
-rw-r--r-- | fs/ceph/file.c | 235 |
1 files changed, 76 insertions, 159 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index f995e3528a33..26cc95421cca 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -283,7 +283,7 @@ int ceph_open(struct inode *inode, struct file *file) spin_lock(&ci->i_ceph_lock); if (__ceph_is_any_real_caps(ci) && (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) { - int mds_wanted = __ceph_caps_mds_wanted(ci); + int mds_wanted = __ceph_caps_mds_wanted(ci, true); int issued = __ceph_caps_issued(ci, NULL); dout("open %p fmode %d want %s issued %s using existing\n", @@ -351,10 +351,6 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, if (dentry->d_name.len > NAME_MAX) return -ENAMETOOLONG; - err = ceph_init_dentry(dentry); - if (err < 0) - return err; - if (flags & O_CREAT) { err = ceph_pre_init_acls(dir, &mode, &acls); if (err < 0) @@ -383,7 +379,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, mask |= CEPH_CAP_XATTR_SHARED; req->r_args.open.mask = cpu_to_le32(mask); - req->r_locked_dir = dir; /* caller holds dir->i_mutex */ + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); @@ -458,71 +455,60 @@ enum { * only return a short read to the caller if we hit EOF. */ static int striped_read(struct inode *inode, - u64 off, u64 len, + u64 pos, u64 len, struct page **pages, int num_pages, - int *checkeof) + int page_align, int *checkeof) { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); - u64 pos, this_len, left; + u64 this_len; loff_t i_size; - int page_align, pages_left; - int read, ret; - struct page **page_pos; + int page_idx; + int ret, read = 0; bool hit_stripe, was_short; /* * we may need to do multiple reads. not atomic, unfortunately. */ - pos = off; - left = len; - page_pos = pages; - pages_left = num_pages; - read = 0; - more: - page_align = pos & ~PAGE_MASK; - this_len = left; + this_len = len; + page_idx = (page_align + read) >> PAGE_SHIFT; ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), &ci->i_layout, pos, &this_len, - ci->i_truncate_seq, - ci->i_truncate_size, - page_pos, pages_left, page_align); + ci->i_truncate_seq, ci->i_truncate_size, + pages + page_idx, num_pages - page_idx, + ((page_align + read) & ~PAGE_MASK)); if (ret == -ENOENT) ret = 0; - hit_stripe = this_len < left; + hit_stripe = this_len < len; was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, + dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read, ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); i_size = i_size_read(inode); if (ret >= 0) { - int didpages; if (was_short && (pos + ret < i_size)) { int zlen = min(this_len - ret, i_size - pos - ret); - int zoff = (off & ~PAGE_MASK) + read + ret; + int zoff = page_align + read + ret; dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + zlen); + pos + ret, pos + ret + zlen); ceph_zero_page_vector_range(zoff, zlen, pages); ret += zlen; } - didpages = (page_align + ret) >> PAGE_SHIFT; + read += ret; pos += ret; - read = pos - off; - left -= ret; - page_pos += didpages; - pages_left -= didpages; + len -= ret; /* hit stripe and need continue*/ - if (left && hit_stripe && pos < i_size) + if (len && hit_stripe && pos < i_size) goto more; } if (read > 0) { ret = read; /* did we bounce off eof? */ - if (pos + left > i_size) + if (pos + len > i_size) *checkeof = CHECK_EOF; } @@ -536,15 +522,16 @@ more: * * If the read spans object boundary, just do multiple reads. */ -static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, - int *checkeof) +static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, + int *checkeof) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct page **pages; u64 off = iocb->ki_pos; - int num_pages, ret; - size_t len = iov_iter_count(i); + int num_pages; + ssize_t ret; + size_t len = iov_iter_count(to); dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, @@ -563,35 +550,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, if (ret < 0) return ret; - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - ret = striped_read(inode, off, len, pages, - num_pages, checkeof); - if (ret > 0) { - int l, k = 0; - size_t left = ret; - - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, left, - PAGE_SIZE - page_off); - l = copy_page_to_iter(pages[k++], page_off, copy, i); - off += l; - left -= l; - if (l < copy) - break; + if (unlikely(to->type & ITER_PIPE)) { + size_t page_off; + ret = iov_iter_get_pages_alloc(to, &pages, len, + &page_off); + if (ret <= 0) + return -ENOMEM; + num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + + ret = striped_read(inode, off, ret, pages, num_pages, + page_off, checkeof); + if (ret > 0) { + iov_iter_advance(to, ret); + off += ret; + } else { + iov_iter_advance(to, 0); } + ceph_put_page_vector(pages, num_pages, false); + } else { + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = striped_read(inode, off, len, pages, num_pages, + (off & ~PAGE_MASK), checkeof); + if (ret > 0) { + int l, k = 0; + size_t left = ret; + + while (left) { + size_t page_off = off & ~PAGE_MASK; + size_t copy = min_t(size_t, left, + PAGE_SIZE - page_off); + l = copy_page_to_iter(pages[k++], page_off, + copy, to); + off += l; + left -= l; + if (l < copy) + break; + } + } + ceph_release_page_vector(pages, num_pages); } - ceph_release_page_vector(pages, num_pages); if (off > iocb->ki_pos) { ret = off - iocb->ki_pos; iocb->ki_pos = off; } - dout("sync_read result %d\n", ret); + dout("sync_read result %zd\n", ret); return ret; } @@ -751,9 +759,7 @@ static void ceph_aio_retry_work(struct work_struct *work) goto out; } - req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE; + req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE; ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); @@ -787,89 +793,6 @@ out: kfree(aio_work); } -/* - * Write commit request unsafe callback, called to tell us when a - * request is unsafe (that is, in flight--has been handed to the - * messenger to send to its target osd). It is called again when - * we've received a response message indicating the request is - * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request - * is completed early (and unsuccessfully) due to a timeout or - * interrupt. - * - * This is used if we requested both an ACK and ONDISK commit reply - * from the OSD. - */ -static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) -{ - struct ceph_inode_info *ci = ceph_inode(req->r_inode); - - dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid, - unsafe ? "un" : ""); - if (unsafe) { - ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); - spin_lock(&ci->i_unsafe_lock); - list_add_tail(&req->r_unsafe_item, - &ci->i_unsafe_writes); - spin_unlock(&ci->i_unsafe_lock); - - complete_all(&req->r_completion); - } else { - spin_lock(&ci->i_unsafe_lock); - list_del_init(&req->r_unsafe_item); - spin_unlock(&ci->i_unsafe_lock); - ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); - } -} - -/* - * Wait on any unsafe replies for the given inode. First wait on the - * newest request, and make that the upper bound. Then, if there are - * more requests, keep waiting on the oldest as long as it is still older - * than the original request. - */ -void ceph_sync_write_wait(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_writes; - struct ceph_osd_request *req; - u64 last_tid; - - if (!S_ISREG(inode->i_mode)) - return; - - spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - /* set upper bound as _last_ entry in chain */ - - req = list_last_entry(head, struct ceph_osd_request, - r_unsafe_item); - last_tid = req->r_tid; - - do { - ceph_osdc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); - - dout("sync_write_wait on tid %llu (until %llu)\n", - req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); - ceph_osdc_put_request(req); - - spin_lock(&ci->i_unsafe_lock); - /* - * from here on look at first entry in chain, since we - * only want to wait for anything older than last_tid - */ - if (list_empty(head)) - break; - req = list_first_entry(head, struct ceph_osd_request, - r_unsafe_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); -} - static ssize_t ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, struct ceph_snap_context *snapc, @@ -906,11 +829,9 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, pos >> PAGE_SHIFT, (pos + count) >> PAGE_SHIFT); if (ret2 < 0) - dout("invalidate_inode_pages2_range returned %d\n", ret); + dout("invalidate_inode_pages2_range returned %d\n", ret2); - flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE; + flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE; } else { flags = CEPH_OSD_FLAG_READ; } @@ -1109,10 +1030,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); - flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ACK; + flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE; while ((len = iov_iter_count(from)) > 0) { size_t left; @@ -1158,8 +1076,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, goto out; } - /* get a second commit callback */ - req->r_unsafe_callback = ceph_sync_write_unsafe; req->r_inode = inode; osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, @@ -1249,8 +1165,9 @@ again: dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, ceph_cap_string(got)); - + current->journal_info = filp; ret = generic_file_read_iter(iocb, to); + current->journal_info = NULL; } dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); @@ -1608,8 +1525,7 @@ static int ceph_zero_partial_object(struct inode *inode, ceph_vino(inode), offset, length, 0, 1, op, - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ONDISK, + CEPH_OSD_FLAG_WRITE, NULL, 0, 0, false); if (IS_ERR(req)) { ret = PTR_ERR(req); @@ -1770,6 +1686,7 @@ const struct file_operations ceph_file_fops = { .fsync = ceph_fsync, .lock = ceph_lock, .flock = ceph_flock, + .splice_read = generic_file_splice_read, .splice_write = iter_file_splice_write, .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, |